[Python-checkins] cpython: #1672568: email now registers defects for base64 payload format errors.

r.david.murray python-checkins at python.org
Mon May 28 03:24:23 CEST 2012


http://hg.python.org/cpython/rev/17341b51af4f
changeset:   77192:17341b51af4f
user:        R David Murray <rdmurray at bitdance.com>
date:        Sun May 27 21:23:34 2012 -0400
summary:
  #1672568: email now registers defects for base64 payload format errors.

Which also means that it is now producing *something* for any base64
payload, which is what leads to the couple of older test changes in
test_email.  This is a slightly backward incompatible behavior change,
but the new behavior is so much more useful than the old (you can now
*reliably* detect errors, and any program that was detecting errors by
sniffing for a base64 return from get_payload(decode=True) and then doing
its own error-recovery decode will just get the error-recovery decode
right away).  So this seems to me to be worth the small risk inherent
in this behavior change.

This patch also refactors the defect tests into a separate test file,
since they are no longer just parser tests.

files:
  Doc/library/email.errors.rst                |    7 +
  Doc/library/email.message.rst               |    8 +-
  Lib/email/message.py                        |   12 +-
  Lib/test/test_email/test_defect_handling.py |  304 ++++++++++
  Lib/test/test_email/test_email.py           |   32 +-
  Lib/test/test_email/test_parser.py          |  256 --------
  6 files changed, 344 insertions(+), 275 deletions(-)


diff --git a/Doc/library/email.errors.rst b/Doc/library/email.errors.rst
--- a/Doc/library/email.errors.rst
+++ b/Doc/library/email.errors.rst
@@ -96,3 +96,10 @@
   this defect, its :meth:`is_multipart` method may return false even though its
   content type claims to be :mimetype:`multipart`.
 
+* :class:`InvalidBase64PaddingDefect` -- When decoding a block of base64
+  enocded bytes, the padding was not correct.  Enough padding is added to
+  perform the decode, but the resulting decoded bytes may be invalid.
+
+* :class:`InvalidBase64CharactersDefect` -- When decoding a block of base64
+  enocded bytes, characters outside the base64 alphebet were encountered.
+  The characters are ignored, but the resulting decoded bytes may be invalid.
diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst
--- a/Doc/library/email.message.rst
+++ b/Doc/library/email.message.rst
@@ -111,10 +111,14 @@
       header. When ``True`` and the message is not a multipart, the payload will
       be decoded if this header's value is ``quoted-printable`` or ``base64``.
       If some other encoding is used, or :mailheader:`Content-Transfer-Encoding`
-      header is missing, or if the payload has bogus base64 data, the payload is
+      header is missing, the payload is
       returned as-is (undecoded).  In all cases the returned value is binary
       data.  If the message is a multipart and the *decode* flag is ``True``,
-      then ``None`` is returned.
+      then ``None`` is returned.  If the payload is base64 and it was not
+      perfectly formed (missing padding, characters outside the base64
+      alphabet), then an appropriate defect will be added to the message's
+      defect property (:class:`~email.errors.InvalidBase64PaddingDefect` or
+      :class:`~email.errors.InvalidBase64CharactersDefect`, respectively).
 
       When *decode* is ``False`` (the default) the body is returned as a string
       without decoding the :mailheader:`Content-Transfer-Encoding`.  However,
diff --git a/Lib/email/message.py b/Lib/email/message.py
--- a/Lib/email/message.py
+++ b/Lib/email/message.py
@@ -17,6 +17,7 @@
 from email import errors
 from email._policybase import compat32
 from email import charset as _charset
+from email._encoded_words import decode_b
 Charset = _charset.Charset
 
 SEMISPACE = '; '
@@ -249,11 +250,12 @@
         if cte == 'quoted-printable':
             return utils._qdecode(bpayload)
         elif cte == 'base64':
-            try:
-                return base64.b64decode(bpayload)
-            except binascii.Error:
-                # Incorrect padding
-                return bpayload
+            # XXX: this is a bit of a hack; decode_b should probably be factored
+            # out somewhere, but I haven't figured out where yet.
+            value, defects = decode_b(b''.join(bpayload.splitlines()))
+            for defect in defects:
+                self.policy.handle_defect(self, defect)
+            return value
         elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
             in_file = BytesIO(bpayload)
             out_file = BytesIO()
diff --git a/Lib/test/test_email/test_defect_handling.py b/Lib/test/test_email/test_defect_handling.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_email/test_defect_handling.py
@@ -0,0 +1,304 @@
+import textwrap
+import unittest
+from email._policybase import Compat32
+from email import errors
+from test.test_email import TestEmailBase
+
+
+class TestMessageDefectDetectionBase:
+
+    dup_boundary_msg = textwrap.dedent("""\
+        Subject: XX
+        From: xx at xx.dk
+        To: XX
+        Mime-version: 1.0
+        Content-type: multipart/mixed;
+           boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+        --MS_Mac_OE_3071477847_720252_MIME_Part
+        Content-type: multipart/alternative;
+           boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+        --MS_Mac_OE_3071477847_720252_MIME_Part
+        Content-type: text/plain; charset="ISO-8859-1"
+        Content-transfer-encoding: quoted-printable
+
+        text
+
+        --MS_Mac_OE_3071477847_720252_MIME_Part
+        Content-type: text/html; charset="ISO-8859-1"
+        Content-transfer-encoding: quoted-printable
+
+        <HTML></HTML>
+
+        --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+        --MS_Mac_OE_3071477847_720252_MIME_Part
+        Content-type: image/gif; name="xx.gif";
+        Content-disposition: attachment
+        Content-transfer-encoding: base64
+
+        Some removed base64 encoded chars.
+
+        --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+        """)
+
+    def test_same_boundary_inner_outer(self):
+        # XXX better would be to actually detect the duplicate.
+        msg = self._str_msg(self.dup_boundary_msg)
+        inner = msg.get_payload(0)
+        self.assertTrue(hasattr(inner, 'defects'))
+        self.assertEqual(len(self.get_defects(inner)), 1)
+        self.assertTrue(isinstance(self.get_defects(inner)[0],
+                                   errors.StartBoundaryNotFoundDefect))
+
+    def test_same_boundary_inner_outer_raises_on_defect(self):
+        with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+            self._str_msg(self.dup_boundary_msg,
+                policy=self.policy.clone(raise_on_defect=True))
+
+    no_boundary_msg = textwrap.dedent("""\
+        Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
+        From: foobar
+        Subject: broken mail
+        MIME-Version: 1.0
+        Content-Type: multipart/report; report-type=delivery-status;
+
+        --JAB03225.986577786/zinfandel.lacita.com
+
+        One part
+
+        --JAB03225.986577786/zinfandel.lacita.com
+        Content-Type: message/delivery-status
+
+        Header: Another part
+
+        --JAB03225.986577786/zinfandel.lacita.com--
+        """)
+
+    def test_multipart_no_boundary(self):
+        msg = self._str_msg(self.no_boundary_msg)
+        self.assertTrue(isinstance(msg.get_payload(), str))
+        self.assertEqual(len(self.get_defects(msg)), 2)
+        self.assertTrue(isinstance(self.get_defects(msg)[0],
+                                   errors.NoBoundaryInMultipartDefect))
+        self.assertTrue(isinstance(self.get_defects(msg)[1],
+                                   errors.MultipartInvariantViolationDefect))
+
+    def test_multipart_no_boundary_raise_on_defect(self):
+        with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+            self._str_msg(self.no_boundary_msg,
+                policy=self.policy.clone(raise_on_defect=True))
+
+    multipart_msg = textwrap.dedent("""\
+        Date: Wed, 14 Nov 2007 12:56:23 GMT
+        From: foo at bar.invalid
+        To: foo at bar.invalid
+        Subject: Content-Transfer-Encoding: base64 and multipart
+        MIME-Version: 1.0
+        Content-Type: multipart/mixed;
+            boundary="===============3344438784458119861=="{}
+
+        --===============3344438784458119861==
+        Content-Type: text/plain
+
+        Test message
+
+        --===============3344438784458119861==
+        Content-Type: application/octet-stream
+        Content-Transfer-Encoding: base64
+
+        YWJj
+
+        --===============3344438784458119861==--
+        """)
+
+    def test_multipart_invalid_cte(self):
+        msg = self._str_msg(
+            self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+        self.assertEqual(len(self.get_defects(msg)), 1)
+        self.assertIsInstance(self.get_defects(msg)[0],
+            errors.InvalidMultipartContentTransferEncodingDefect)
+
+    def test_multipart_invalid_cte_raise_on_defect(self):
+        with self.assertRaises(
+                errors.InvalidMultipartContentTransferEncodingDefect):
+            self._str_msg(
+                self.multipart_msg.format(
+                    "\nContent-Transfer-Encoding: base64"),
+                policy=self.policy.clone(raise_on_defect=True))
+
+    def test_multipart_no_cte_no_defect(self):
+        msg = self._str_msg(self.multipart_msg.format(''))
+        self.assertEqual(len(self.get_defects(msg)), 0)
+
+    def test_multipart_valid_cte_no_defect(self):
+        for cte in ('7bit', '8bit', 'BINary'):
+            msg = self._str_msg(
+                self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
+            self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
+
+    lying_multipart_msg = textwrap.dedent("""\
+        From: "Allison Dunlap" <xxx at example.com>
+        To: yyy at example.com
+        Subject: 64423
+        Date: Sun, 11 Jul 2004 16:09:27 -0300
+        MIME-Version: 1.0
+        Content-Type: multipart/alternative;
+
+        Blah blah blah
+        """)
+
+    def test_lying_multipart(self):
+        msg = self._str_msg(self.lying_multipart_msg)
+        self.assertTrue(hasattr(msg, 'defects'))
+        self.assertEqual(len(self.get_defects(msg)), 2)
+        self.assertTrue(isinstance(self.get_defects(msg)[0],
+                                   errors.NoBoundaryInMultipartDefect))
+        self.assertTrue(isinstance(self.get_defects(msg)[1],
+                                   errors.MultipartInvariantViolationDefect))
+
+    def test_lying_multipart_raise_on_defect(self):
+        with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+            self._str_msg(self.lying_multipart_msg,
+                policy=self.policy.clone(raise_on_defect=True))
+
+    missing_start_boundary_msg = textwrap.dedent("""\
+        Content-Type: multipart/mixed; boundary="AAA"
+        From: Mail Delivery Subsystem <xxx at example.com>
+        To: yyy at example.com
+
+        --AAA
+
+        Stuff
+
+        --AAA
+        Content-Type: message/rfc822
+
+        From: webmaster at python.org
+        To: zzz at example.com
+        Content-Type: multipart/mixed; boundary="BBB"
+
+        --BBB--
+
+        --AAA--
+
+        """)
+
+    def test_missing_start_boundary(self):
+        # The message structure is:
+        #
+        # multipart/mixed
+        #    text/plain
+        #    message/rfc822
+        #        multipart/mixed [*]
+        #
+        # [*] This message is missing its start boundary
+        outer = self._str_msg(self.missing_start_boundary_msg)
+        bad = outer.get_payload(1).get_payload(0)
+        self.assertEqual(len(self.get_defects(bad)), 1)
+        self.assertTrue(isinstance(self.get_defects(bad)[0],
+                                   errors.StartBoundaryNotFoundDefect))
+
+    def test_missing_start_boundary_raise_on_defect(self):
+        with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+            self._str_msg(self.missing_start_boundary_msg,
+                          policy=self.policy.clone(raise_on_defect=True))
+
+    def test_first_line_is_continuation_header(self):
+        msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
+        self.assertEqual(msg.keys(), ['Subject'])
+        self.assertEqual(msg.get_payload(), 'body')
+        self.assertEqual(len(self.get_defects(msg)), 1)
+        self.assertDefectsEqual(self.get_defects(msg),
+                                 [errors.FirstHeaderLineIsContinuationDefect])
+        self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
+
+    def test_first_line_is_continuation_header_raise_on_defect(self):
+        with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+            self._str_msg(' Line 1\nSubject: test\n\nbody\n',
+                          policy=self.policy.clone(raise_on_defect=True))
+
+    def test_missing_header_body_separator(self):
+        # Our heuristic if we see a line that doesn't look like a header (no
+        # leading whitespace but no ':') is to assume that the blank line that
+        # separates the header from the body is missing, and to stop parsing
+        # headers and start parsing the body.
+        msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
+        self.assertEqual(msg.keys(), ['Subject'])
+        self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
+        self.assertDefectsEqual(self.get_defects(msg),
+                                [errors.MissingHeaderBodySeparatorDefect])
+
+    def test_missing_header_body_separator_raise_on_defect(self):
+        with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
+            self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
+                          policy=self.policy.clone(raise_on_defect=True))
+
+    badly_padded_base64_payload = textwrap.dedent("""\
+        Subject: test
+        MIME-Version: 1.0
+        Content-Type: text/plain; charset="utf-8"
+        Content-Transfer-Encoding: base64
+
+        dmk
+        """)
+
+    def test_bad_padding_in_base64_payload(self):
+        msg = self._str_msg(self.badly_padded_base64_payload)
+        self.assertEqual(msg.get_payload(decode=True), b'vi')
+        self.assertDefectsEqual(self.get_defects(msg),
+                                [errors.InvalidBase64PaddingDefect])
+
+    def test_bad_padding_in_base64_payload_raise_on_defect(self):
+        msg = self._str_msg(self.badly_padded_base64_payload,
+                            policy=self.policy.clone(raise_on_defect=True))
+        with self.assertRaises(errors.InvalidBase64PaddingDefect):
+            msg.get_payload(decode=True)
+
+    invalid_chars_in_base64_payload = textwrap.dedent("""\
+        Subject: test
+        MIME-Version: 1.0
+        Content-Type: text/plain; charset="utf-8"
+        Content-Transfer-Encoding: base64
+
+        dm\x01k===
+        """)
+
+    def test_invalid_chars_in_base64_payload(self):
+        msg = self._str_msg(self.invalid_chars_in_base64_payload)
+        self.assertEqual(msg.get_payload(decode=True), b'vi')
+        self.assertDefectsEqual(self.get_defects(msg),
+                                [errors.InvalidBase64CharactersDefect])
+
+    def test_invalid_chars_in_base64_payload_raise_on_defect(self):
+        msg = self._str_msg(self.invalid_chars_in_base64_payload,
+                            policy=self.policy.clone(raise_on_defect=True))
+        with self.assertRaises(errors.InvalidBase64CharactersDefect):
+            msg.get_payload(decode=True)
+
+
+class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
+
+    def get_defects(self, obj):
+        return obj.defects
+
+
+class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
+                                        TestEmailBase):
+
+    class CapturePolicy(Compat32):
+        captured = None
+        def register_defect(self, obj, defect):
+            self.captured.append(defect)
+
+    def setUp(self):
+        self.policy = self.CapturePolicy(captured=list())
+
+    def get_defects(self, obj):
+        return self.policy.captured
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -513,6 +513,7 @@
         eq(msg.values(), ['One Hundred', 'Twenty', 'Three', 'Eleven'])
         self.assertRaises(KeyError, msg.replace_header, 'Fourth', 'Missing')
 
+    # test_defect_handling:test_invalid_chars_in_base64_payload
     def test_broken_base64_payload(self):
         x = 'AwDp0P7//y6LwKEAcPa/6Q=9'
         msg = Message()
@@ -520,7 +521,10 @@
         msg['content-transfer-encoding'] = 'base64'
         msg.set_payload(x)
         self.assertEqual(msg.get_payload(decode=True),
-                         bytes(x, 'raw-unicode-escape'))
+                         (b'\x03\x00\xe9\xd0\xfe\xff\xff.\x8b\xc0'
+                          b'\xa1\x00p\xf6\xbf\xe9\x0f'))
+        self.assertIsInstance(msg.defects[0],
+                              errors.InvalidBase64CharactersDefect)
 
     def test_broken_unicode_payload(self):
         # This test improves coverage but is not a compliance test.
@@ -1815,7 +1819,7 @@
         eq(msg.get_content_maintype(), 'text')
         eq(msg.get_content_subtype(), 'plain')
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_same_boundary_inner_outer(self):
         unless = self.assertTrue
         msg = self._msgobj('msg_15.txt')
@@ -1826,7 +1830,7 @@
         unless(isinstance(inner.defects[0],
                           errors.StartBoundaryNotFoundDefect))
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_multipart_no_boundary(self):
         unless = self.assertTrue
         msg = self._msgobj('msg_25.txt')
@@ -1860,7 +1864,7 @@
         --===============3344438784458119861==--
         """)
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_multipart_invalid_cte(self):
         msg = self._str_msg(
             self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
@@ -1868,12 +1872,12 @@
         self.assertIsInstance(msg.defects[0],
             errors.InvalidMultipartContentTransferEncodingDefect)
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_multipart_no_cte_no_defect(self):
         msg = self._str_msg(self.multipart_msg.format(''))
         self.assertEqual(len(msg.defects), 0)
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_multipart_valid_cte_no_defect(self):
         for cte in ('7bit', '8bit', 'BINary'):
             msg = self._str_msg(
@@ -1930,7 +1934,7 @@
 counter to RFC 2822, there's no separating newline here
 """)
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_lying_multipart(self):
         unless = self.assertTrue
         msg = self._msgobj('msg_41.txt')
@@ -1941,7 +1945,7 @@
         unless(isinstance(msg.defects[1],
                           errors.MultipartInvariantViolationDefect))
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_missing_start_boundary(self):
         outer = self._msgobj('msg_42.txt')
         # The message structure is:
@@ -1957,7 +1961,7 @@
         self.assertTrue(isinstance(bad.defects[0],
                                    errors.StartBoundaryNotFoundDefect))
 
-    # test_parser.TestMessageDefectDetectionBase
+    # test_defect_handling
     def test_first_line_is_continuation_header(self):
         eq = self.assertEqual
         m = ' Line 1\nSubject: test\n\nbody'
@@ -3271,15 +3275,19 @@
         self.assertEqual(msg.get_payload(decode=True),
                         'pöstál\n'.encode('utf-8'))
 
+    # test_defect_handling:test_invalid_chars_in_base64_payload
     def test_8bit_in_base64_body(self):
-        # Sticking an 8bit byte in a base64 block makes it undecodable by
-        # normal means, so the block is returned undecoded, but as bytes.
+        # If we get 8bit bytes in a base64 body, we can just ignore them
+        # as being outside the base64 alphabet and decode anyway.  But
+        # we register a defect.
         m = self.bodytest_msg.format(charset='utf-8',
                                      cte='base64',
                                      bodyline='cMO2c3RhbAá=').encode('utf-8')
         msg = email.message_from_bytes(m)
         self.assertEqual(msg.get_payload(decode=True),
-                         'cMO2c3RhbAá=\n'.encode('utf-8'))
+                         'pöstal'.encode('utf-8'))
+        self.assertIsInstance(msg.defects[0],
+                              errors.InvalidBase64CharactersDefect)
 
     def test_8bit_in_uuencode_body(self):
         # Sticking an 8bit byte in a uuencode block makes it undecodable by
diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py
--- a/Lib/test/test_email/test_parser.py
+++ b/Lib/test/test_email/test_parser.py
@@ -1,9 +1,6 @@
 import io
 import email
-import textwrap
 import unittest
-from email._policybase import Compat32
-from email import errors
 from email.message import Message
 from test.test_email import TestEmailBase
 
@@ -35,258 +32,5 @@
     # XXX add tests for other functions that take Message arg.
 
 
-class TestMessageDefectDetectionBase:
-
-    dup_boundary_msg = textwrap.dedent("""\
-        Subject: XX
-        From: xx at xx.dk
-        To: XX
-        Mime-version: 1.0
-        Content-type: multipart/mixed;
-           boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
-
-        --MS_Mac_OE_3071477847_720252_MIME_Part
-        Content-type: multipart/alternative;
-           boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
-
-        --MS_Mac_OE_3071477847_720252_MIME_Part
-        Content-type: text/plain; charset="ISO-8859-1"
-        Content-transfer-encoding: quoted-printable
-
-        text
-
-        --MS_Mac_OE_3071477847_720252_MIME_Part
-        Content-type: text/html; charset="ISO-8859-1"
-        Content-transfer-encoding: quoted-printable
-
-        <HTML></HTML>
-
-        --MS_Mac_OE_3071477847_720252_MIME_Part--
-
-        --MS_Mac_OE_3071477847_720252_MIME_Part
-        Content-type: image/gif; name="xx.gif";
-        Content-disposition: attachment
-        Content-transfer-encoding: base64
-
-        Some removed base64 encoded chars.
-
-        --MS_Mac_OE_3071477847_720252_MIME_Part--
-
-        """)
-
-    def test_same_boundary_inner_outer(self):
-        # XXX better would be to actually detect the duplicate.
-        msg = self._str_msg(self.dup_boundary_msg)
-        inner = msg.get_payload(0)
-        self.assertTrue(hasattr(inner, 'defects'))
-        self.assertEqual(len(self.get_defects(inner)), 1)
-        self.assertTrue(isinstance(self.get_defects(inner)[0],
-                                   errors.StartBoundaryNotFoundDefect))
-
-    def test_same_boundary_inner_outer_raises_on_defect(self):
-        with self.assertRaises(errors.StartBoundaryNotFoundDefect):
-            self._str_msg(self.dup_boundary_msg,
-                policy=self.policy.clone(raise_on_defect=True))
-
-    no_boundary_msg = textwrap.dedent("""\
-        Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
-        From: foobar
-        Subject: broken mail
-        MIME-Version: 1.0
-        Content-Type: multipart/report; report-type=delivery-status;
-
-        --JAB03225.986577786/zinfandel.lacita.com
-
-        One part
-
-        --JAB03225.986577786/zinfandel.lacita.com
-        Content-Type: message/delivery-status
-
-        Header: Another part
-
-        --JAB03225.986577786/zinfandel.lacita.com--
-        """)
-
-    def test_multipart_no_boundary(self):
-        msg = self._str_msg(self.no_boundary_msg)
-        self.assertTrue(isinstance(msg.get_payload(), str))
-        self.assertEqual(len(self.get_defects(msg)), 2)
-        self.assertTrue(isinstance(self.get_defects(msg)[0],
-                                   errors.NoBoundaryInMultipartDefect))
-        self.assertTrue(isinstance(self.get_defects(msg)[1],
-                                   errors.MultipartInvariantViolationDefect))
-
-    def test_multipart_no_boundary_raise_on_defect(self):
-        with self.assertRaises(errors.NoBoundaryInMultipartDefect):
-            self._str_msg(self.no_boundary_msg,
-                policy=self.policy.clone(raise_on_defect=True))
-
-    multipart_msg = textwrap.dedent("""\
-        Date: Wed, 14 Nov 2007 12:56:23 GMT
-        From: foo at bar.invalid
-        To: foo at bar.invalid
-        Subject: Content-Transfer-Encoding: base64 and multipart
-        MIME-Version: 1.0
-        Content-Type: multipart/mixed;
-            boundary="===============3344438784458119861=="{}
-
-        --===============3344438784458119861==
-        Content-Type: text/plain
-
-        Test message
-
-        --===============3344438784458119861==
-        Content-Type: application/octet-stream
-        Content-Transfer-Encoding: base64
-
-        YWJj
-
-        --===============3344438784458119861==--
-        """)
-
-    def test_multipart_invalid_cte(self):
-        msg = self._str_msg(
-            self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
-        self.assertEqual(len(self.get_defects(msg)), 1)
-        self.assertIsInstance(self.get_defects(msg)[0],
-            errors.InvalidMultipartContentTransferEncodingDefect)
-
-    def test_multipart_invalid_cte_raise_on_defect(self):
-        with self.assertRaises(
-                errors.InvalidMultipartContentTransferEncodingDefect):
-            self._str_msg(
-                self.multipart_msg.format(
-                    "\nContent-Transfer-Encoding: base64"),
-                policy=self.policy.clone(raise_on_defect=True))
-
-    def test_multipart_no_cte_no_defect(self):
-        msg = self._str_msg(self.multipart_msg.format(''))
-        self.assertEqual(len(self.get_defects(msg)), 0)
-
-    def test_multipart_valid_cte_no_defect(self):
-        for cte in ('7bit', '8bit', 'BINary'):
-            msg = self._str_msg(
-                self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
-            self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
-
-    lying_multipart_msg = textwrap.dedent("""\
-        From: "Allison Dunlap" <xxx at example.com>
-        To: yyy at example.com
-        Subject: 64423
-        Date: Sun, 11 Jul 2004 16:09:27 -0300
-        MIME-Version: 1.0
-        Content-Type: multipart/alternative;
-
-        Blah blah blah
-        """)
-
-    def test_lying_multipart(self):
-        msg = self._str_msg(self.lying_multipart_msg)
-        self.assertTrue(hasattr(msg, 'defects'))
-        self.assertEqual(len(self.get_defects(msg)), 2)
-        self.assertTrue(isinstance(self.get_defects(msg)[0],
-                                   errors.NoBoundaryInMultipartDefect))
-        self.assertTrue(isinstance(self.get_defects(msg)[1],
-                                   errors.MultipartInvariantViolationDefect))
-
-    def test_lying_multipart_raise_on_defect(self):
-        with self.assertRaises(errors.NoBoundaryInMultipartDefect):
-            self._str_msg(self.lying_multipart_msg,
-                policy=self.policy.clone(raise_on_defect=True))
-
-    missing_start_boundary_msg = textwrap.dedent("""\
-        Content-Type: multipart/mixed; boundary="AAA"
-        From: Mail Delivery Subsystem <xxx at example.com>
-        To: yyy at example.com
-
-        --AAA
-
-        Stuff
-
-        --AAA
-        Content-Type: message/rfc822
-
-        From: webmaster at python.org
-        To: zzz at example.com
-        Content-Type: multipart/mixed; boundary="BBB"
-
-        --BBB--
-
-        --AAA--
-
-        """)
-
-    def test_missing_start_boundary(self):
-        # The message structure is:
-        #
-        # multipart/mixed
-        #    text/plain
-        #    message/rfc822
-        #        multipart/mixed [*]
-        #
-        # [*] This message is missing its start boundary
-        outer = self._str_msg(self.missing_start_boundary_msg)
-        bad = outer.get_payload(1).get_payload(0)
-        self.assertEqual(len(self.get_defects(bad)), 1)
-        self.assertTrue(isinstance(self.get_defects(bad)[0],
-                                   errors.StartBoundaryNotFoundDefect))
-
-    def test_missing_start_boundary_raise_on_defect(self):
-        with self.assertRaises(errors.StartBoundaryNotFoundDefect):
-            self._str_msg(self.missing_start_boundary_msg,
-                          policy=self.policy.clone(raise_on_defect=True))
-
-    def test_first_line_is_continuation_header(self):
-        msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
-        self.assertEqual(msg.keys(), ['Subject'])
-        self.assertEqual(msg.get_payload(), 'body')
-        self.assertEqual(len(self.get_defects(msg)), 1)
-        self.assertDefectsEqual(self.get_defects(msg),
-                                 [errors.FirstHeaderLineIsContinuationDefect])
-        self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
-
-    def test_first_line_is_continuation_header_raise_on_defect(self):
-        with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
-            self._str_msg(' Line 1\nSubject: test\n\nbody\n',
-                          policy=self.policy.clone(raise_on_defect=True))
-
-    def test_missing_header_body_separator(self):
-        # Our heuristic if we see a line that doesn't look like a header (no
-        # leading whitespace but no ':') is to assume that the blank line that
-        # separates the header from the body is missing, and to stop parsing
-        # headers and start parsing the body.
-        msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
-        self.assertEqual(msg.keys(), ['Subject'])
-        self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
-        self.assertDefectsEqual(self.get_defects(msg),
-                                [errors.MissingHeaderBodySeparatorDefect])
-
-    def test_missing_header_body_separator_raise_on_defect(self):
-        with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
-            self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
-                          policy=self.policy.clone(raise_on_defect=True))
-
-
-class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
-
-    def get_defects(self, obj):
-        return obj.defects
-
-
-class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
-                                        TestEmailBase):
-
-    class CapturePolicy(Compat32):
-        captured = None
-        def register_defect(self, obj, defect):
-            self.captured.append(defect)
-
-    def setUp(self):
-        self.policy = self.CapturePolicy(captured=list())
-
-    def get_defects(self, obj):
-        return self.policy.captured
-
-
 if __name__ == '__main__':
     unittest.main()

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list