[Python-checkins] bpo-35805: Add parser for Message-ID email header. (GH-13397)

Barry Warsaw webhook-mailer at python.org
Tue Jun 4 13:41:41 EDT 2019


https://github.com/python/cpython/commit/46d88a113142b26c01c95c93846a89318ba87ffc
commit: 46d88a113142b26c01c95c93846a89318ba87ffc
branch: master
author: Abhilash Raj <maxking at users.noreply.github.com>
committer: Barry Warsaw <barry at python.org>
date: 2019-06-04T10:41:34-07:00
summary:

bpo-35805: Add parser for Message-ID email header. (GH-13397)

* bpo-35805: Add parser for Message-ID header.

This parser is based on the definition of Identification Fields from RFC 5322
Sec 3.6.4.

This should also prevent folding of Message-ID header using RFC 2047 encoded
words and hence fix bpo-35805.

* Prevent folding of non-ascii message-id headers.
* Add fold method to MsgID token to prevent folding.

files:
A Misc/NEWS.d/next/Library/2019-05-17-15-11-08.bpo-35805.E4YwYz.rst
M Doc/library/email.headerregistry.rst
M Lib/email/_header_value_parser.py
M Lib/email/headerregistry.py
M Lib/test/test_email/test__header_value_parser.py
M Lib/test/test_email/test_headerregistry.py

diff --git a/Doc/library/email.headerregistry.rst b/Doc/library/email.headerregistry.rst
index c3ce90c2d6d9..9376da2b8d39 100644
--- a/Doc/library/email.headerregistry.rst
+++ b/Doc/library/email.headerregistry.rst
@@ -321,19 +321,26 @@ variant, :attr:`~.BaseHeader.max_count` is set to 1.
 
     The default mappings are:
 
-      :subject:         UniqueUnstructuredHeader
-      :date:            UniqueDateHeader
-      :resent-date:     DateHeader
-      :orig-date:       UniqueDateHeader
-      :sender:          UniqueSingleAddressHeader
-      :resent-sender:   SingleAddressHeader
-      :to:              UniqueAddressHeader
-      :resent-to:       AddressHeader
-      :cc:              UniqueAddressHeader
-      :resent-cc:       AddressHeader
-      :from:            UniqueAddressHeader
-      :resent-from:     AddressHeader
-      :reply-to:        UniqueAddressHeader
+      :subject:                   UniqueUnstructuredHeader
+      :date:                      UniqueDateHeader
+      :resent-date:               DateHeader
+      :orig-date:                 UniqueDateHeader
+      :sender:                    UniqueSingleAddressHeader
+      :resent-sender:             SingleAddressHeader
+      :to:                        UniqueAddressHeader
+      :resent-to:                 AddressHeader
+      :cc:                        UniqueAddressHeader
+      :resent-cc:                 AddressHeader
+      :bcc:                       UniqueAddressHeader
+      :resent-bcc:                AddressHeader
+      :from:                      UniqueAddressHeader
+      :resent-from:               AddressHeader
+      :reply-to:                  UniqueAddressHeader
+      :mime-version:              MIMEVersionHeader
+      :content-type:              ContentTypeHeader
+      :content-disposition:       ContentDispositionHeader
+      :content-transfer-encoding: ContentTransferEncodingHeader
+      :message-id:                MessageIDHeader
 
     ``HeaderRegistry`` has the following methods:
 
diff --git a/Lib/email/_header_value_parser.py b/Lib/email/_header_value_parser.py
index 14cc00c61e07..34969ab59151 100644
--- a/Lib/email/_header_value_parser.py
+++ b/Lib/email/_header_value_parser.py
@@ -179,37 +179,30 @@ def comments(self):
 
 
 class UnstructuredTokenList(TokenList):
-
     token_type = 'unstructured'
 
 
 class Phrase(TokenList):
-
     token_type = 'phrase'
 
 class Word(TokenList):
-
     token_type = 'word'
 
 
 class CFWSList(WhiteSpaceTokenList):
-
     token_type = 'cfws'
 
 
 class Atom(TokenList):
-
     token_type = 'atom'
 
 
 class Token(TokenList):
-
     token_type = 'token'
     encode_as_ew = False
 
 
 class EncodedWord(TokenList):
-
     token_type = 'encoded-word'
     cte = None
     charset = None
@@ -496,16 +489,19 @@ def domain(self):
 
 
 class DotAtom(TokenList):
-
     token_type = 'dot-atom'
 
 
 class DotAtomText(TokenList):
-
     token_type = 'dot-atom-text'
     as_ew_allowed = True
 
 
+class NoFoldLiteral(TokenList):
+    token_type = 'no-fold-literal'
+    as_ew_allowed = False
+
+
 class AddrSpec(TokenList):
 
     token_type = 'addr-spec'
@@ -809,7 +805,6 @@ def params(self):
 
 
 class ContentType(ParameterizedHeaderValue):
-
     token_type = 'content-type'
     as_ew_allowed = False
     maintype = 'text'
@@ -817,27 +812,35 @@ class ContentType(ParameterizedHeaderValue):
 
 
 class ContentDisposition(ParameterizedHeaderValue):
-
     token_type = 'content-disposition'
     as_ew_allowed = False
     content_disposition = None
 
 
 class ContentTransferEncoding(TokenList):
-
     token_type = 'content-transfer-encoding'
     as_ew_allowed = False
     cte = '7bit'
 
 
 class HeaderLabel(TokenList):
-
     token_type = 'header-label'
     as_ew_allowed = False
 
 
-class Header(TokenList):
+class MsgID(TokenList):
+    token_type = 'msg-id'
+    as_ew_allowed = False
+
+    def fold(self, policy):
+        # message-id tokens may not be folded.
+        return str(self) + policy.linesep
+
+class MessageID(MsgID):
+    token_type = 'message-id'
 
+
+class Header(TokenList):
     token_type = 'header'
 
 
@@ -1583,7 +1586,7 @@ def get_addr_spec(value):
     addr_spec.append(token)
     if not value or value[0] != '@':
         addr_spec.defects.append(errors.InvalidHeaderDefect(
-            "add-spec local part with no domain"))
+            "addr-spec local part with no domain"))
         return addr_spec, value
     addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
     token, value = get_domain(value[1:])
@@ -1968,6 +1971,110 @@ def get_address_list(value):
             value = value[1:]
     return address_list, value
 
+
+def get_no_fold_literal(value):
+    """ no-fold-literal = "[" *dtext "]"
+    """
+    no_fold_literal = NoFoldLiteral()
+    if not value:
+        raise errors.HeaderParseError(
+            "expected no-fold-literal but found '{}'".format(value))
+    if value[0] != '[':
+        raise errors.HeaderParseError(
+            "expected '[' at the start of no-fold-literal "
+            "but found '{}'".format(value))
+    no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start'))
+    value = value[1:]
+    token, value = get_dtext(value)
+    no_fold_literal.append(token)
+    if not value or value[0] != ']':
+        raise errors.HeaderParseError(
+            "expected ']' at the end of no-fold-literal "
+            "but found '{}'".format(value))
+    no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end'))
+    return no_fold_literal, value[1:]
+
+def get_msg_id(value):
+    """msg-id = [CFWS] "<" id-left '@' id-right  ">" [CFWS]
+       id-left = dot-atom-text / obs-id-left
+       id-right = dot-atom-text / no-fold-literal / obs-id-right
+       no-fold-literal = "[" *dtext "]"
+    """
+    msg_id = MsgID()
+    if value[0] in CFWS_LEADER:
+        token, value = get_cfws(value)
+        msg_id.append(token)
+    if not value or value[0] != '<':
+        raise errors.HeaderParseError(
+            "expected msg-id but found '{}'".format(value))
+    msg_id.append(ValueTerminal('<', 'msg-id-start'))
+    value = value[1:]
+    # Parse id-left.
+    try:
+        token, value = get_dot_atom_text(value)
+    except errors.HeaderParseError:
+        try:
+            # obs-id-left is same as local-part of add-spec.
+            token, value = get_obs_local_part(value)
+            msg_id.defects.append(errors.ObsoleteHeaderDefect(
+                "obsolete id-left in msg-id"))
+        except errors.HeaderParseError:
+            raise errors.HeaderParseError(
+                "expected dot-atom-text or obs-id-left"
+                " but found '{}'".format(value))
+    msg_id.append(token)
+    if not value or value[0] != '@':
+        msg_id.defects.append(errors.InvalidHeaderDefect(
+            "msg-id with no id-right"))
+        # Even though there is no id-right, if the local part
+        # ends with `>` let's just parse it too and return
+        # along with the defect.
+        if value and value[0] == '>':
+            msg_id.append(ValueTerminal('>', 'msg-id-end'))
+            value = value[1:]
+        return msg_id, value
+    msg_id.append(ValueTerminal('@', 'address-at-symbol'))
+    value = value[1:]
+    # Parse id-right.
+    try:
+        token, value = get_dot_atom_text(value)
+    except errors.HeaderParseError:
+        try:
+            token, value = get_no_fold_literal(value)
+        except errors.HeaderParseError as e:
+            try:
+                token, value = get_domain(value)
+                msg_id.defects.append(errors.ObsoleteHeaderDefect(
+                    "obsolete id-right in msg-id"))
+            except errors.HeaderParseError:
+                raise errors.HeaderParseError(
+                    "expected dot-atom-text, no-fold-literal or obs-id-right"
+                    " but found '{}'".format(value))
+    msg_id.append(token)
+    if value and value[0] == '>':
+        value = value[1:]
+    else:
+        msg_id.defects.append(errors.InvalidHeaderDefect(
+            "missing trailing '>' on msg-id"))
+    msg_id.append(ValueTerminal('>', 'msg-id-end'))
+    if value and value[0] in CFWS_LEADER:
+        token, value = get_cfws(value)
+        msg_id.append(token)
+    return msg_id, value
+
+
+def parse_message_id(value):
+    """message-id      =   "Message-ID:" msg-id CRLF
+    """
+    message_id = MessageID()
+    try:
+        token, value = get_msg_id(value)
+    except errors.HeaderParseError:
+        message_id.defects.append(errors.InvalidHeaderDefect(
+            "Expected msg-id but found {!r}".format(value)))
+    message_id.append(token)
+    return message_id
+
 #
 # XXX: As I begin to add additional header parsers, I'm realizing we probably
 # have two level of parser routines: the get_XXX methods that get a token in
diff --git a/Lib/email/headerregistry.py b/Lib/email/headerregistry.py
index 00652049f2fa..452c6ad50846 100644
--- a/Lib/email/headerregistry.py
+++ b/Lib/email/headerregistry.py
@@ -520,6 +520,18 @@ def cte(self):
         return self._cte
 
 
+class MessageIDHeader:
+
+    max_count = 1
+    value_parser = staticmethod(parser.parse_message_id)
+
+    @classmethod
+    def parse(cls, value, kwds):
+        kwds['parse_tree'] = parse_tree = cls.value_parser(value)
+        kwds['decoded'] = str(parse_tree)
+        kwds['defects'].extend(parse_tree.all_defects)
+
+
 # The header factory #
 
 _default_header_map = {
@@ -542,6 +554,7 @@ def cte(self):
     'content-type':                 ContentTypeHeader,
     'content-disposition':          ContentDispositionHeader,
     'content-transfer-encoding':    ContentTransferEncodingHeader,
+    'message-id':                   MessageIDHeader,
     }
 
 class HeaderRegistry:
diff --git a/Lib/test/test_email/test__header_value_parser.py b/Lib/test/test_email/test__header_value_parser.py
index 676732bb3d02..12da3cffb84c 100644
--- a/Lib/test/test_email/test__header_value_parser.py
+++ b/Lib/test/test_email/test__header_value_parser.py
@@ -2494,6 +2494,78 @@ def test_invalid_content_transfer_encoding(self):
             ";foo", ";foo", ";foo", [errors.InvalidHeaderDefect]*3
         )
 
+    # get_msg_id
+
+    def test_get_msg_id_valid(self):
+        msg_id = self._test_get_x(
+            parser.get_msg_id,
+            "<simeple.local at example.something.com>",
+            "<simeple.local at example.something.com>",
+            "<simeple.local at example.something.com>",
+            [],
+            '',
+            )
+        self.assertEqual(msg_id.token_type, 'msg-id')
+
+    def test_get_msg_id_obsolete_local(self):
+        msg_id = self._test_get_x(
+            parser.get_msg_id,
+            '<"simeple.local"@example.com>',
+            '<"simeple.local"@example.com>',
+            '<simeple.local at example.com>',
+            [errors.ObsoleteHeaderDefect],
+            '',
+            )
+        self.assertEqual(msg_id.token_type, 'msg-id')
+
+    def test_get_msg_id_non_folding_literal_domain(self):
+        msg_id = self._test_get_x(
+            parser.get_msg_id,
+            "<simple.local@[someexamplecom.domain]>",
+            "<simple.local@[someexamplecom.domain]>",
+            "<simple.local@[someexamplecom.domain]>",
+            [],
+            "",
+            )
+        self.assertEqual(msg_id.token_type, 'msg-id')
+
+
+    def test_get_msg_id_obsolete_domain_part(self):
+        msg_id = self._test_get_x(
+            parser.get_msg_id,
+            "<simplelocal@(old)example.com>",
+            "<simplelocal@(old)example.com>",
+            "<simplelocal@ example.com>",
+            [errors.ObsoleteHeaderDefect],
+            ""
+        )
+
+    def test_get_msg_id_no_id_right_part(self):
+        msg_id = self._test_get_x(
+            parser.get_msg_id,
+            "<simplelocal>",
+            "<simplelocal>",
+            "<simplelocal>",
+            [errors.InvalidHeaderDefect],
+            ""
+        )
+        self.assertEqual(msg_id.token_type, 'msg-id')
+
+    def test_get_msg_id_no_angle_start(self):
+        with self.assertRaises(errors.HeaderParseError):
+            parser.get_msg_id("msgwithnoankle")
+
+    def test_get_msg_id_no_angle_end(self):
+        msg_id = self._test_get_x(
+            parser.get_msg_id,
+            "<simplelocal at domain",
+            "<simplelocal at domain>",
+            "<simplelocal at domain>",
+            [errors.InvalidHeaderDefect],
+            ""
+        )
+        self.assertEqual(msg_id.token_type, 'msg-id')
+
 
 @parameterize
 class Test_parse_mime_parameters(TestParserMixin, TestEmailBase):
diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py
index d1007099f666..75505460aba8 100644
--- a/Lib/test/test_email/test_headerregistry.py
+++ b/Lib/test/test_email/test_headerregistry.py
@@ -1648,6 +1648,34 @@ def test_fold_overlong_words_using_RFC2047(self):
                 'xxxxxxxxxxxxxxxxxxxx=3D=3D-xxx-xx-xx?=\n'
             ' =?utf-8?q?=3E?=\n')
 
+    def test_message_id_header_is_not_folded(self):
+        h = self.make_header(
+            'Message-ID',
+            '<somemessageidlongerthan at maxlinelength.com>')
+        self.assertEqual(
+            h.fold(policy=policy.default.clone(max_line_length=20)),
+            'Message-ID: <somemessageidlongerthan at maxlinelength.com>\n')
+
+        # Test message-id isn't folded when id-right is no-fold-literal.
+        h = self.make_header(
+            'Message-ID',
+            '<somemessageidlongerthan@[127.0.0.0.0.0.0.0.0.1]>')
+        self.assertEqual(
+            h.fold(policy=policy.default.clone(max_line_length=20)),
+            'Message-ID: <somemessageidlongerthan@[127.0.0.0.0.0.0.0.0.1]>\n')
+
+        # Test message-id isn't folded when id-right is non-ascii characters.
+        h = self.make_header('Message-ID', '<ईमेल@wők.com>')
+        self.assertEqual(
+            h.fold(policy=policy.default.clone(max_line_length=30)),
+            'Message-ID: <ईमेल@wők.com>\n')
+
+        # Test message-id is folded without breaking the msg-id token into
+        # encoded words, *even* if they don't fit into max_line_length.
+        h = self.make_header('Message-ID', '<ईमेलfromMessage at wők.com>')
+        self.assertEqual(
+            h.fold(policy=policy.default.clone(max_line_length=20)),
+            'Message-ID:\n <ईमेलfromMessage at wők.com>\n')
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2019-05-17-15-11-08.bpo-35805.E4YwYz.rst b/Misc/NEWS.d/next/Library/2019-05-17-15-11-08.bpo-35805.E4YwYz.rst
new file mode 100644
index 000000000000..2d8c8b3222d0
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-05-17-15-11-08.bpo-35805.E4YwYz.rst
@@ -0,0 +1,2 @@
+Add parser for Message-ID header and add it to default HeaderRegistry. This
+should prevent folding of Message-ID using RFC 2048 encoded words.



More information about the Python-checkins mailing list