[pypy-svn] pypy default: a patch by ezio melotti. This fixes some unicode-related bugs.

fijal commits-noreply at bitbucket.org
Sat Apr 16 21:11:35 CEST 2011


Author: Maciej Fijalkowski <fijall at gmail.com>
Branch: 
Changeset: r43401:3bc52d356c52
Date: 2011-04-16 21:08 +0200
http://bitbucket.org/pypy/pypy/changeset/3bc52d356c52/

Log:	a patch by ezio melotti. This fixes some unicode-related bugs.

	Details here: http://bugs.python.org/issue8271

diff --git a/pypy/rlib/test/test_runicode.py b/pypy/rlib/test/test_runicode.py
--- a/pypy/rlib/test/test_runicode.py
+++ b/pypy/rlib/test/test_runicode.py
@@ -66,9 +66,10 @@
         assert called[0]
         assert "42424242" in result
 
-    def checkdecodeerror(self, s, encoding, start, stop, addstuff=True):
+    def checkdecodeerror(self, s, encoding, start, stop,
+                         addstuff=True, msg=None):
         called = [0]
-        def errorhandler(errors, enc, msg, t, startingpos,
+        def errorhandler(errors, enc, errmsg, t, startingpos,
                          endingpos):
             called[0] += 1
             if called[0] == 1:
@@ -77,6 +78,8 @@
                 assert t is s
                 assert start == startingpos
                 assert stop == endingpos
+                if msg is not None:
+                    assert errmsg == msg
                 return u"42424242", stop
             return u"", endingpos
         decoder = self.getdecoder(encoding)
@@ -90,7 +93,7 @@
 
 
 class TestDecoding(UnicodeTests):
-    
+
     # XXX test bom recognition in utf-16
     # XXX test proper error handling
 
@@ -131,6 +134,96 @@
                          "utf-32 utf-32-be utf-32-le").split():
             self.checkdecode(uni, encoding)
 
+    def test_ascii_error(self):
+        self.checkdecodeerror("abc\xFF\xFF\xFFcde", "ascii", 3, 4)
+
+    def test_utf16_errors(self):
+        # trunkated BOM
+        for s in ["\xff", "\xfe"]:
+            self.checkdecodeerror(s, "utf-16", 0, len(s), addstuff=False)
+
+        for s in [
+                  # unexpected end of data ascii
+                  "\xff\xfeF",
+                  # unexpected end of data
+                  '\xff\xfe\xc0\xdb\x00', '\xff\xfe\xc0\xdb', '\xff\xfe\xc0',
+                  ]:
+            self.checkdecodeerror(s, "utf-16", 2, len(s), addstuff=False)
+        for s in [
+                  # illegal surrogate
+                  "\xff\xfe\xff\xdb\xff\xff",
+                  ]:
+            self.checkdecodeerror(s, "utf-16", 2, 4, addstuff=False)
+
+    def test_utf16_bugs(self):
+        s = '\x80-\xe9\xdeL\xa3\x9b'
+        py.test.raises(UnicodeDecodeError, runicode.str_decode_utf_16_le,
+                       s, len(s), True)
+
+    def test_utf7_bugs(self):
+        u = u'A\u2262\u0391.'
+        assert runicode.unicode_encode_utf_7(u, len(u), None) == 'A+ImIDkQ.'
+
+    def test_utf7_tofrom_utf8_bug(self):
+        def _assert_decu7(input, expected):
+            assert runicode.str_decode_utf_7(input, len(input), None) == (expected, len(input))
+
+        _assert_decu7('+-', u'+')
+        _assert_decu7('+-+-', u'++')
+        _assert_decu7('+-+AOQ-', u'+\xe4')
+        _assert_decu7('+AOQ-', u'\xe4')
+        _assert_decu7('+AOQ-', u'\xe4')
+        _assert_decu7('+AOQ- ', u'\xe4 ')
+        _assert_decu7(' +AOQ-', u' \xe4')
+        _assert_decu7(' +AOQ- ', u' \xe4 ')
+        _assert_decu7('+AOQ-+AOQ-', u'\xe4\xe4')
+
+        s_utf7 = 'Die M+AOQ-nner +AOQ-rgen sich!'
+        s_utf8 = u'Die M&#228;nner &#228;rgen sich!'
+        s_utf8_esc = u'Die M\xe4nner \xe4rgen sich!'
+
+        _assert_decu7(s_utf7, s_utf8_esc)
+        _assert_decu7(s_utf7, s_utf8)
+
+        assert runicode.unicode_encode_utf_7(s_utf8_esc, len(s_utf8_esc), None) == s_utf7
+        assert runicode.unicode_encode_utf_7(s_utf8,     len(s_utf8_esc), None) == s_utf7
+
+    def test_utf7_partial(self):
+        s = u"a+-b".encode('utf-7')
+        assert s == "a+--b"
+        decode = self.getdecoder('utf-7')
+        assert decode(s, 1, None) == (u'a', 1)
+        assert decode(s, 2, None) == (u'a', 1)
+        assert decode(s, 3, None) == (u'a+', 3)
+        assert decode(s, 4, None) == (u'a+-', 4)
+        assert decode(s, 5, None) == (u'a+-b', 5)
+
+    def test_utf7_surrogates(self):
+        encode = self.getencoder('utf-7')
+        u = u'\U000abcde'
+        assert encode(u, len(u), None) == '+2m/c3g-'
+        decode = self.getdecoder('utf-7')
+        s = '+3ADYAA-'
+        raises(UnicodeError, decode, s, len(s), None)
+        def replace_handler(errors, codec, message, input, start, end):
+            return u'?', end
+        assert decode(s, len(s), None, final=True,
+                      errorhandler = replace_handler) == (u'??', len(s))
+
+
+class TestUTF8Decoding(UnicodeTests):
+    def __init__(self):
+        self.decoder = self.getdecoder('utf-8')
+
+    def replace_handler(self, errors, codec, message, input, start, end):
+        return u'\ufffd', end
+
+    def ignore_handler(self, errors, codec, message, input, start, end):
+        return u'', end
+
+    def to_bytestring(self, bytes):
+        return ''.join(chr(int(c, 16)) for c in bytes.split())
+
     def test_single_chars_utf8(self):
         for s in ["\xd7\x90", "\xd6\x96", "\xeb\x96\x95", "\xf0\x90\x91\x93"]:
             self.checkdecode(s, "utf-8")
@@ -140,30 +233,297 @@
         # This test will raise an error with python 3.x
         self.checkdecode(u"\ud800", "utf-8")
 
+    def test_invalid_start_byte(self):
+        """
+        Test that an 'invalid start byte' error is raised when the first byte
+        is not in the ASCII range or is not a valid start byte of a 2-, 3-, or
+        4-bytes sequence. The invalid start byte is replaced with a single
+        U+FFFD when errors='replace'.
+        E.g. <80> is a continuation byte and can appear only after a start byte.
+        """
+        FFFD = u'\ufffd'
+        for byte in '\x80\xA0\x9F\xBF\xC0\xC1\xF5\xFF':
+            raises(UnicodeDecodeError, self.decoder, byte, 1, None, final=True)
+            self.checkdecodeerror(byte, 'utf-8', 0, 1, addstuff=False,
+                                  msg='invalid start byte')
+            assert self.decoder(byte, 1, None, final=True,
+                       errorhandler=self.replace_handler) == (FFFD, 1)
+            assert (self.decoder('aaaa' + byte + 'bbbb', 9, None,
+                        final=True, errorhandler=self.replace_handler) ==
+                        (u'aaaa'+ FFFD + u'bbbb', 9))
+            assert self.decoder(byte, 1, None, final=True,
+                           errorhandler=self.ignore_handler) == (u'', 1)
+            assert (self.decoder('aaaa' + byte + 'bbbb', 9, None,
+                        final=True, errorhandler=self.ignore_handler) ==
+                        (u'aaaabbbb', 9))
+
+    def test_unexpected_end_of_data(self):
+        """
+        Test that an 'unexpected end of data' error is raised when the string
+        ends after a start byte of a 2-, 3-, or 4-bytes sequence without having
+        enough continuation bytes.  The incomplete sequence is replaced with a
+        single U+FFFD when errors='replace'.
+        E.g. in the sequence <F3 80 80>, F3 is the start byte of a 4-bytes
+        sequence, but it's followed by only 2 valid continuation bytes and the
+        last continuation bytes is missing.
+        Note: the continuation bytes must be all valid, if one of them is
+        invalid another error will be raised.
+        """
+        sequences = [
+            'C2', 'DF',
+            'E0 A0', 'E0 BF', 'E1 80', 'E1 BF', 'EC 80', 'EC BF',
+            'ED 80', 'ED 9F', 'EE 80', 'EE BF', 'EF 80', 'EF BF',
+            'F0 90', 'F0 BF', 'F0 90 80', 'F0 90 BF', 'F0 BF 80', 'F0 BF BF',
+            'F1 80', 'F1 BF', 'F1 80 80', 'F1 80 BF', 'F1 BF 80', 'F1 BF BF',
+            'F3 80', 'F3 BF', 'F3 80 80', 'F3 80 BF', 'F3 BF 80', 'F3 BF BF',
+            'F4 80', 'F4 8F', 'F4 80 80', 'F4 80 BF', 'F4 8F 80', 'F4 8F BF'
+        ]
+        FFFD = u'\ufffd'
+        for seq in sequences:
+            seq = self.to_bytestring(seq)
+            raises(UnicodeDecodeError, self.decoder, seq, len(seq),
+                   None, final=True)
+            self.checkdecodeerror(seq, 'utf-8', 0, len(seq), addstuff=False,
+                                  msg='unexpected end of data')
+            assert self.decoder(seq, len(seq), None, final=True,
+                       errorhandler=self.replace_handler) == (FFFD, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.replace_handler) ==
+                        (u'aaaa'+ FFFD + u'bbbb', len(seq) + 8))
+            assert self.decoder(seq, len(seq), None, final=True,
+                           errorhandler=self.ignore_handler) == (u'', len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.ignore_handler) ==
+                        (u'aaaabbbb', len(seq) + 8))
+
+    def test_invalid_cb_for_2bytes_seq(self):
+        """
+        Test that an 'invalid continuation byte' error is raised when the
+        continuation byte of a 2-bytes sequence is invalid.  The start byte
+        is replaced by a single U+FFFD and the second byte is handled
+        separately when errors='replace'.
+        E.g. in the sequence <C2 41>, C2 is the start byte of a 2-bytes
+        sequence, but 41 is not a valid continuation byte because it's the
+        ASCII letter 'A'.
+        """
+        FFFD = u'\ufffd'
+        FFFDx2 = FFFD * 2
+        sequences = [
+            ('C2 00', FFFD+u'\x00'), ('C2 7F', FFFD+u'\x7f'),
+            ('C2 C0', FFFDx2), ('C2 FF', FFFDx2),
+            ('DF 00', FFFD+u'\x00'), ('DF 7F', FFFD+u'\x7f'),
+            ('DF C0', FFFDx2), ('DF FF', FFFDx2),
+        ]
+        for seq, res in sequences:
+            seq = self.to_bytestring(seq)
+            raises(UnicodeDecodeError, self.decoder, seq, len(seq),
+                   None, final=True)
+            self.checkdecodeerror(seq, 'utf-8', 0, 1, addstuff=False,
+                                  msg='invalid continuation byte')
+            assert self.decoder(seq, len(seq), None, final=True,
+                       errorhandler=self.replace_handler) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.replace_handler) ==
+                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            res = res.replace(FFFD, u'')
+            assert self.decoder(seq, len(seq), None, final=True,
+                           errorhandler=self.ignore_handler) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.ignore_handler) ==
+                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+
+    def test_invalid_cb_for_3bytes_seq(self):
+        """
+        Test that an 'invalid continuation byte' error is raised when the
+        continuation byte(s) of a 3-bytes sequence are invalid.  When
+        errors='replace', if the first continuation byte is valid, the first
+        two bytes (start byte + 1st cb) are replaced by a single U+FFFD and the
+        third byte is handled separately, otherwise only the start byte is
+        replaced with a U+FFFD and the other continuation bytes are handled
+        separately.
+        E.g. in the sequence <E1 80 41>, E1 is the start byte of a 3-bytes
+        sequence, 80 is a valid continuation byte, but 41 is not a valid cb
+        because it's the ASCII letter 'A'.
+        Note: when the start byte is E0 or ED, the valid ranges for the first
+        continuation byte are limited to A0..BF and 80..9F respectively.
+        However, when the start byte is ED, Python 2 considers all the bytes
+        in range 80..BF valid.  This is fixed in Python 3.
+        """
+        FFFD = u'\ufffd'
+        FFFDx2 = FFFD * 2
+        sequences = [
+            ('E0 00', FFFD+u'\x00'), ('E0 7F', FFFD+u'\x7f'), ('E0 80', FFFDx2),
+            ('E0 9F', FFFDx2), ('E0 C0', FFFDx2), ('E0 FF', FFFDx2),
+            ('E0 A0 00', FFFD+u'\x00'), ('E0 A0 7F', FFFD+u'\x7f'),
+            ('E0 A0 C0', FFFDx2), ('E0 A0 FF', FFFDx2),
+            ('E0 BF 00', FFFD+u'\x00'), ('E0 BF 7F', FFFD+u'\x7f'),
+            ('E0 BF C0', FFFDx2), ('E0 BF FF', FFFDx2), ('E1 00', FFFD+u'\x00'),
+            ('E1 7F', FFFD+u'\x7f'), ('E1 C0', FFFDx2), ('E1 FF', FFFDx2),
+            ('E1 80 00', FFFD+u'\x00'), ('E1 80 7F', FFFD+u'\x7f'),
+            ('E1 80 C0', FFFDx2), ('E1 80 FF', FFFDx2),
+            ('E1 BF 00', FFFD+u'\x00'), ('E1 BF 7F', FFFD+u'\x7f'),
+            ('E1 BF C0', FFFDx2), ('E1 BF FF', FFFDx2), ('EC 00', FFFD+u'\x00'),
+            ('EC 7F', FFFD+u'\x7f'), ('EC C0', FFFDx2), ('EC FF', FFFDx2),
+            ('EC 80 00', FFFD+u'\x00'), ('EC 80 7F', FFFD+u'\x7f'),
+            ('EC 80 C0', FFFDx2), ('EC 80 FF', FFFDx2),
+            ('EC BF 00', FFFD+u'\x00'), ('EC BF 7F', FFFD+u'\x7f'),
+            ('EC BF C0', FFFDx2), ('EC BF FF', FFFDx2), ('ED 00', FFFD+u'\x00'),
+            ('ED 7F', FFFD+u'\x7f'),
+            # ('ED A0', FFFDx2), ('ED BF', FFFDx2), # see note ^
+            ('ED C0', FFFDx2), ('ED FF', FFFDx2), ('ED 80 00', FFFD+u'\x00'),
+            ('ED 80 7F', FFFD+u'\x7f'), ('ED 80 C0', FFFDx2),
+            ('ED 80 FF', FFFDx2), ('ED 9F 00', FFFD+u'\x00'),
+            ('ED 9F 7F', FFFD+u'\x7f'), ('ED 9F C0', FFFDx2),
+            ('ED 9F FF', FFFDx2), ('EE 00', FFFD+u'\x00'),
+            ('EE 7F', FFFD+u'\x7f'), ('EE C0', FFFDx2), ('EE FF', FFFDx2),
+            ('EE 80 00', FFFD+u'\x00'), ('EE 80 7F', FFFD+u'\x7f'),
+            ('EE 80 C0', FFFDx2), ('EE 80 FF', FFFDx2),
+            ('EE BF 00', FFFD+u'\x00'), ('EE BF 7F', FFFD+u'\x7f'),
+            ('EE BF C0', FFFDx2), ('EE BF FF', FFFDx2), ('EF 00', FFFD+u'\x00'),
+            ('EF 7F', FFFD+u'\x7f'), ('EF C0', FFFDx2), ('EF FF', FFFDx2),
+            ('EF 80 00', FFFD+u'\x00'), ('EF 80 7F', FFFD+u'\x7f'),
+            ('EF 80 C0', FFFDx2), ('EF 80 FF', FFFDx2),
+            ('EF BF 00', FFFD+u'\x00'), ('EF BF 7F', FFFD+u'\x7f'),
+            ('EF BF C0', FFFDx2), ('EF BF FF', FFFDx2),
+        ]
+        for seq, res in sequences:
+            seq = self.to_bytestring(seq)
+            raises(UnicodeDecodeError, self.decoder, seq, len(seq),
+                   None, final=True)
+            self.checkdecodeerror(seq, 'utf-8', 0, len(seq)-1, addstuff=False,
+                                  msg='invalid continuation byte')
+            assert self.decoder(seq, len(seq), None, final=True,
+                       errorhandler=self.replace_handler) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.replace_handler) ==
+                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            res = res.replace(FFFD, u'')
+            assert self.decoder(seq, len(seq), None, final=True,
+                           errorhandler=self.ignore_handler) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.ignore_handler) ==
+                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+
+    def test_invalid_cb_for_4bytes_seq(self):
+        """
+        Test that an 'invalid continuation byte' error is raised when the
+        continuation byte(s) of a 4-bytes sequence are invalid.  When
+        errors='replace',the start byte and all the following valid
+        continuation bytes are replaced with a single U+FFFD, and all the bytes
+        starting from the first invalid continuation bytes (included) are
+        handled separately.
+        E.g. in the sequence <E1 80 41>, E1 is the start byte of a 3-bytes
+        sequence, 80 is a valid continuation byte, but 41 is not a valid cb
+        because it's the ASCII letter 'A'.
+        Note: when the start byte is E0 or ED, the valid ranges for the first
+        continuation byte are limited to A0..BF and 80..9F respectively.
+        However, when the start byte is ED, Python 2 considers all the bytes
+        in range 80..BF valid.  This is fixed in Python 3.
+        """
+        FFFD = u'\ufffd'
+        FFFDx2 = FFFD * 2
+        sequences = [
+            ('F0 00', FFFD+u'\x00'), ('F0 7F', FFFD+u'\x7f'), ('F0 80', FFFDx2),
+            ('F0 8F', FFFDx2), ('F0 C0', FFFDx2), ('F0 FF', FFFDx2),
+            ('F0 90 00', FFFD+u'\x00'), ('F0 90 7F', FFFD+u'\x7f'),
+            ('F0 90 C0', FFFDx2), ('F0 90 FF', FFFDx2),
+            ('F0 BF 00', FFFD+u'\x00'), ('F0 BF 7F', FFFD+u'\x7f'),
+            ('F0 BF C0', FFFDx2), ('F0 BF FF', FFFDx2),
+            ('F0 90 80 00', FFFD+u'\x00'), ('F0 90 80 7F', FFFD+u'\x7f'),
+            ('F0 90 80 C0', FFFDx2), ('F0 90 80 FF', FFFDx2),
+            ('F0 90 BF 00', FFFD+u'\x00'), ('F0 90 BF 7F', FFFD+u'\x7f'),
+            ('F0 90 BF C0', FFFDx2), ('F0 90 BF FF', FFFDx2),
+            ('F0 BF 80 00', FFFD+u'\x00'), ('F0 BF 80 7F', FFFD+u'\x7f'),
+            ('F0 BF 80 C0', FFFDx2), ('F0 BF 80 FF', FFFDx2),
+            ('F0 BF BF 00', FFFD+u'\x00'), ('F0 BF BF 7F', FFFD+u'\x7f'),
+            ('F0 BF BF C0', FFFDx2), ('F0 BF BF FF', FFFDx2),
+            ('F1 00', FFFD+u'\x00'), ('F1 7F', FFFD+u'\x7f'), ('F1 C0', FFFDx2),
+            ('F1 FF', FFFDx2), ('F1 80 00', FFFD+u'\x00'),
+            ('F1 80 7F', FFFD+u'\x7f'), ('F1 80 C0', FFFDx2),
+            ('F1 80 FF', FFFDx2), ('F1 BF 00', FFFD+u'\x00'),
+            ('F1 BF 7F', FFFD+u'\x7f'), ('F1 BF C0', FFFDx2),
+            ('F1 BF FF', FFFDx2), ('F1 80 80 00', FFFD+u'\x00'),
+            ('F1 80 80 7F', FFFD+u'\x7f'), ('F1 80 80 C0', FFFDx2),
+            ('F1 80 80 FF', FFFDx2), ('F1 80 BF 00', FFFD+u'\x00'),
+            ('F1 80 BF 7F', FFFD+u'\x7f'), ('F1 80 BF C0', FFFDx2),
+            ('F1 80 BF FF', FFFDx2), ('F1 BF 80 00', FFFD+u'\x00'),
+            ('F1 BF 80 7F', FFFD+u'\x7f'), ('F1 BF 80 C0', FFFDx2),
+            ('F1 BF 80 FF', FFFDx2), ('F1 BF BF 00', FFFD+u'\x00'),
+            ('F1 BF BF 7F', FFFD+u'\x7f'), ('F1 BF BF C0', FFFDx2),
+            ('F1 BF BF FF', FFFDx2), ('F3 00', FFFD+u'\x00'),
+            ('F3 7F', FFFD+u'\x7f'), ('F3 C0', FFFDx2), ('F3 FF', FFFDx2),
+            ('F3 80 00', FFFD+u'\x00'), ('F3 80 7F', FFFD+u'\x7f'),
+            ('F3 80 C0', FFFDx2), ('F3 80 FF', FFFDx2),
+            ('F3 BF 00', FFFD+u'\x00'), ('F3 BF 7F', FFFD+u'\x7f'),
+            ('F3 BF C0', FFFDx2), ('F3 BF FF', FFFDx2),
+            ('F3 80 80 00', FFFD+u'\x00'), ('F3 80 80 7F', FFFD+u'\x7f'),
+            ('F3 80 80 C0', FFFDx2), ('F3 80 80 FF', FFFDx2),
+            ('F3 80 BF 00', FFFD+u'\x00'), ('F3 80 BF 7F', FFFD+u'\x7f'),
+            ('F3 80 BF C0', FFFDx2), ('F3 80 BF FF', FFFDx2),
+            ('F3 BF 80 00', FFFD+u'\x00'), ('F3 BF 80 7F', FFFD+u'\x7f'),
+            ('F3 BF 80 C0', FFFDx2), ('F3 BF 80 FF', FFFDx2),
+            ('F3 BF BF 00', FFFD+u'\x00'), ('F3 BF BF 7F', FFFD+u'\x7f'),
+            ('F3 BF BF C0', FFFDx2), ('F3 BF BF FF', FFFDx2),
+            ('F4 00', FFFD+u'\x00'), ('F4 7F', FFFD+u'\x7f'), ('F4 90', FFFDx2),
+            ('F4 BF', FFFDx2), ('F4 C0', FFFDx2), ('F4 FF', FFFDx2),
+            ('F4 80 00', FFFD+u'\x00'), ('F4 80 7F', FFFD+u'\x7f'),
+            ('F4 80 C0', FFFDx2), ('F4 80 FF', FFFDx2),
+            ('F4 8F 00', FFFD+u'\x00'), ('F4 8F 7F', FFFD+u'\x7f'),
+            ('F4 8F C0', FFFDx2), ('F4 8F FF', FFFDx2),
+            ('F4 80 80 00', FFFD+u'\x00'), ('F4 80 80 7F', FFFD+u'\x7f'),
+            ('F4 80 80 C0', FFFDx2), ('F4 80 80 FF', FFFDx2),
+            ('F4 80 BF 00', FFFD+u'\x00'), ('F4 80 BF 7F', FFFD+u'\x7f'),
+            ('F4 80 BF C0', FFFDx2), ('F4 80 BF FF', FFFDx2),
+            ('F4 8F 80 00', FFFD+u'\x00'), ('F4 8F 80 7F', FFFD+u'\x7f'),
+            ('F4 8F 80 C0', FFFDx2), ('F4 8F 80 FF', FFFDx2),
+            ('F4 8F BF 00', FFFD+u'\x00'), ('F4 8F BF 7F', FFFD+u'\x7f'),
+            ('F4 8F BF C0', FFFDx2), ('F4 8F BF FF', FFFDx2)
+        ]
+        for seq, res in sequences:
+            seq = self.to_bytestring(seq)
+            raises(UnicodeDecodeError, self.decoder, seq, len(seq),
+                   None, final=True)
+            self.checkdecodeerror(seq, 'utf-8', 0, len(seq)-1, addstuff=False,
+                                  msg='invalid continuation byte')
+            assert self.decoder(seq, len(seq), None, final=True,
+                       errorhandler=self.replace_handler) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.replace_handler) ==
+                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+            res = res.replace(FFFD, u'')
+            assert self.decoder(seq, len(seq), None, final=True,
+                           errorhandler=self.ignore_handler) == (res, len(seq))
+            assert (self.decoder('aaaa' + seq + 'bbbb', len(seq) + 8, None,
+                        final=True, errorhandler=self.ignore_handler) ==
+                        (u'aaaa' + res + u'bbbb', len(seq) + 8))
+
     def test_utf8_errors(self):
-        for s in [# unexpected end of data
-                  "\xd7", "\xd6", "\xeb\x96", "\xf0\x90\x91"]:
-            self.checkdecodeerror(s, "utf-8", 0, len(s), addstuff=False)
-
-        # unexpected code byte
-        for s in ["\x81", "\xbf"]:
-            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True)
+        # unexpected end of data
+        for s in ['\xd7', '\xd6', '\xeb\x96', '\xf0\x90\x91', '\xc2', '\xdf']:
+            self.checkdecodeerror(s, 'utf-8', 0, len(s), addstuff=False,
+                                  msg='unexpected end of data')
 
         # invalid data 2 byte
         for s in ["\xd7\x50", "\xd6\x06", "\xd6\xD6"]:
-            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True)
+            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True,
+                                  msg='invalid continuation byte')
         # invalid data 3 byte
         for s in ["\xeb\x56\x95", "\xeb\x06\x95", "\xeb\xD6\x95"]:
-            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True)
+            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True,
+                                  msg='invalid continuation byte')
         for s in ["\xeb\x96\x55", "\xeb\x96\x05", "\xeb\x96\xD5"]:
-            self.checkdecodeerror(s, "utf-8", 0, 2, addstuff=True)
+            self.checkdecodeerror(s, "utf-8", 0, 2, addstuff=True,
+                                  msg='invalid continuation byte')
         # invalid data 4 byte
         for s in ["\xf0\x50\x91\x93", "\xf0\x00\x91\x93", "\xf0\xd0\x91\x93"]:
-            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True)
+            self.checkdecodeerror(s, "utf-8", 0, 1, addstuff=True,
+                                  msg='invalid continuation byte')
         for s in ["\xf0\x90\x51\x93", "\xf0\x90\x01\x93", "\xf0\x90\xd1\x93"]:
-            self.checkdecodeerror(s, "utf-8", 0, 2, addstuff=True)
+            self.checkdecodeerror(s, "utf-8", 0, 2, addstuff=True,
+                                  msg='invalid continuation byte')
         for s in ["\xf0\x90\x91\x53", "\xf0\x90\x91\x03", "\xf0\x90\x91\xd3"]:
-            self.checkdecodeerror(s, "utf-8", 0, 3, addstuff=True)
+            self.checkdecodeerror(s, "utf-8", 0, 3, addstuff=True,
+                                  msg='invalid continuation byte')
 
 
     def test_issue8271(self):
@@ -249,97 +609,18 @@
             ('\x61\xF1\x80\x80\xE1\x80\xC2\x62\x80\x63\x80\xBF\x64',
              u'\x61\uFFFD\uFFFD\uFFFD\x62\uFFFD\x63\uFFFD\uFFFD\x64'),
         ]
-        def replace_handler(errors, codec, message, input, start, end):
-            return FFFD, end
-        def ignore_handler(errors, codec, message, input, start, end):
-            return u'', end
+
         for n, (seq, res) in enumerate(sequences):
             decoder = self.getdecoder('utf-8')
             raises(UnicodeDecodeError, decoder, seq, len(seq), None, final=True)
             assert decoder(seq, len(seq), None, final=True,
-                           errorhandler=replace_handler) == (res, len(seq))
+                           errorhandler=self.replace_handler) == (res, len(seq))
             assert decoder(seq + 'b', len(seq) + 1, None, final=True,
-                           errorhandler=replace_handler) == (res + u'b',
-                                                             len(seq) + 1)
+                           errorhandler=self.replace_handler) == (res + u'b',
+                                                                  len(seq) + 1)
             res = res.replace(FFFD, u'')
             assert decoder(seq, len(seq), None, final=True,
-                           errorhandler=ignore_handler) == (res, len(seq))
-
-    def test_ascii_error(self):
-        self.checkdecodeerror("abc\xFF\xFF\xFFcde", "ascii", 3, 4)
-
-    def test_utf16_errors(self):
-        # trunkated BOM
-        for s in ["\xff", "\xfe"]:
-            self.checkdecodeerror(s, "utf-16", 0, len(s), addstuff=False)
-
-        for s in [
-                  # unexpected end of data ascii
-                  "\xff\xfeF",
-                  # unexpected end of data
-                  '\xff\xfe\xc0\xdb\x00', '\xff\xfe\xc0\xdb', '\xff\xfe\xc0', 
-                  ]:
-            self.checkdecodeerror(s, "utf-16", 2, len(s), addstuff=False)
-        for s in [
-                  # illegal surrogate
-                  "\xff\xfe\xff\xdb\xff\xff",
-                  ]:
-            self.checkdecodeerror(s, "utf-16", 2, 4, addstuff=False)
-
-    def test_utf16_bugs(self):
-        s = '\x80-\xe9\xdeL\xa3\x9b'
-        py.test.raises(UnicodeDecodeError, runicode.str_decode_utf_16_le,
-                       s, len(s), True)
-
-    def test_utf7_bugs(self):
-        u = u'A\u2262\u0391.'
-        assert runicode.unicode_encode_utf_7(u, len(u), None) == 'A+ImIDkQ.'
-
-    def test_utf7_tofrom_utf8_bug(self):
-        def _assert_decu7(input, expected):
-            assert runicode.str_decode_utf_7(input, len(input), None) == (expected, len(input))
-
-        _assert_decu7('+-', u'+')
-        _assert_decu7('+-+-', u'++')
-        _assert_decu7('+-+AOQ-', u'+\xe4')
-        _assert_decu7('+AOQ-', u'\xe4')
-        _assert_decu7('+AOQ-', u'\xe4')
-        _assert_decu7('+AOQ- ', u'\xe4 ')
-        _assert_decu7(' +AOQ-', u' \xe4')
-        _assert_decu7(' +AOQ- ', u' \xe4 ')
-        _assert_decu7('+AOQ-+AOQ-', u'\xe4\xe4')
-
-        s_utf7 = 'Die M+AOQ-nner +AOQ-rgen sich!'
-        s_utf8 = u'Die M&#228;nner &#228;rgen sich!'
-        s_utf8_esc = u'Die M\xe4nner \xe4rgen sich!'
-
-        _assert_decu7(s_utf7, s_utf8_esc)
-        _assert_decu7(s_utf7, s_utf8)
-
-        assert runicode.unicode_encode_utf_7(s_utf8_esc, len(s_utf8_esc), None) == s_utf7
-        assert runicode.unicode_encode_utf_7(s_utf8,     len(s_utf8_esc), None) == s_utf7
-
-    def test_utf7_partial(self):
-        s = u"a+-b".encode('utf-7')
-        assert s == "a+--b"
-        decode = self.getdecoder('utf-7')
-        assert decode(s, 1, None) == (u'a', 1)
-        assert decode(s, 2, None) == (u'a', 1)
-        assert decode(s, 3, None) == (u'a+', 3)
-        assert decode(s, 4, None) == (u'a+-', 4)
-        assert decode(s, 5, None) == (u'a+-b', 5)
-
-    def test_utf7_surrogates(self):
-        encode = self.getencoder('utf-7')
-        u = u'\U000abcde'
-        assert encode(u, len(u), None) == '+2m/c3g-'
-        decode = self.getdecoder('utf-7')
-        s = '+3ADYAA-'
-        raises(UnicodeError, decode, s, len(s), None)
-        def replace_handler(errors, codec, message, input, start, end):
-            return u'?', end
-        assert decode(s, len(s), None, final=True,
-                      errorhandler = replace_handler) == (u'??', len(s))
+                           errorhandler=self.ignore_handler) == (res, len(seq))
 
 
 class TestEncoding(UnicodeTests):
@@ -376,7 +657,7 @@
                 self.checkencode(uni, "utf-7")
             for encoding in ("utf-8 utf-16 utf-16-be utf-16-le "
                              "utf-32 utf-32-be utf-32-le").split():
-                self.checkencode(uni, encoding)                
+                self.checkencode(uni, encoding)
 
     def test_maxunicode(self):
         uni = unichr(sys.maxunicode)
@@ -384,7 +665,7 @@
             self.checkencode(uni, "utf-7")
         for encoding in ("utf-8 utf-16 utf-16-be utf-16-le "
                          "utf-32 utf-32-be utf-32-le").split():
-            self.checkencode(uni, encoding)        
+            self.checkencode(uni, encoding)
 
     def test_single_chars_utf8(self):
         # check every number of bytes per char
@@ -394,7 +675,7 @@
     def test_utf8_surrogates(self):
         # check replacing of two surrogates by single char while encoding
         # make sure that the string itself is not marshalled
-        u = u"\ud800" 
+        u = u"\ud800"
         for i in range(4):
             u += u"\udc00"
         self.checkencode(u, "utf-8")
@@ -422,7 +703,7 @@
     def test_utf8(self):
         from pypy.rpython.test.test_llinterp import interpret
         def f(x):
-            
+
             s1 = "".join(["\xd7\x90\xd6\x96\xeb\x96\x95\xf0\x90\x91\x93"] * x)
             u, consumed = runicode.str_decode_utf_8(s1, len(s1), True)
             s2 = runicode.unicode_encode_utf_8(u, len(u), True)
@@ -438,6 +719,6 @@
             u = runicode.UNICHR(x)
             t = runicode.ORD(u)
             return t
-            
+
         res = interpret(f, [0x10140])
         assert res == 0x10140

diff --git a/pypy/rlib/runicode.py b/pypy/rlib/runicode.py
--- a/pypy/rlib/runicode.py
+++ b/pypy/rlib/runicode.py
@@ -87,8 +87,9 @@
     result = UnicodeBuilder(size)
     pos = 0
     while pos < size:
-        ch = s[pos]
-        ordch1 = ord(ch)
+        ordch1 = ord(s[pos])
+        # fast path for ASCII
+        # XXX maybe use a while loop here
         if ordch1 < 0x80:
             result.append(unichr(ordch1))
             pos += 1
@@ -98,110 +99,149 @@
         if pos + n > size:
             if not final:
                 break
-            else:
-                endpos = pos + 1
-                while endpos < size and ord(s[endpos]) & 0xC0 == 0x80:
-                    endpos += 1
-                r, pos = errorhandler(errors, "utf-8",
-                                      "unexpected end of data",
-                                      s,  pos, endpos)
+            charsleft = size - pos - 1 # either 0, 1, 2
+            # note: when we get the 'unexpected end of data' we don't care
+            # about the pos anymore and we just ignore the value
+            if not charsleft:
+                # there's only the start byte and nothing else
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'unexpected end of data',
+                                      s, pos, pos+1)
+                result.append(r)
+                break
+            ordch2 = ord(s[pos+1])
+            if n == 3:
+                # 3-bytes seq with only a continuation byte
+                if (ordch2>>6 != 0b10 or
+                    (ordch1 == 0xe0 and ordch2 < 0xa0)):
+                    # or (ordch1 == 0xed and ordch2 > 0x9f)
+                    # second byte invalid, take the first and continue
+                    r, pos = errorhandler(errors, 'utf-8',
+                                          'invalid continuation byte',
+                                          s, pos, pos+1)
+                    result.append(r)
+                    continue
+                else:
+                    # second byte valid, but third byte missing
+                    r, pos = errorhandler(errors, 'utf-8',
+                                      'unexpected end of data',
+                                      s, pos, pos+2)
+                    result.append(r)
+                    break
+            elif n == 4:
+                # 4-bytes seq with 1 or 2 continuation bytes
+                if (ordch2>>6 != 0b10 or
+                    (ordch1 == 0xf0 and ordch2 < 0x90) or
+                    (ordch1 == 0xf4 and ordch2 > 0x8f)):
+                    # second byte invalid, take the first and continue
+                    r, pos = errorhandler(errors, 'utf-8',
+                                          'invalid continuation byte',
+                                          s, pos, pos+1)
+                    result.append(r)
+                    continue
+                elif charsleft == 2 and ord(s[pos+2])>>6 != 0b10:
+                    # third byte invalid, take the first two and continue
+                    r, pos = errorhandler(errors, 'utf-8',
+                                          'invalid continuation byte',
+                                          s, pos, pos+2)
+                    result.append(r)
+                    continue
+                else:
+                    # there's only 1 or 2 valid cb, but the others are missing
+                    r, pos = errorhandler(errors, 'utf-8',
+                                      'unexpected end of data',
+                                      s, pos, pos+charsleft+1)
+                    result.append(r)
+                    break
+
+        if n == 0:
+            r, pos = errorhandler(errors, 'utf-8',
+                                  'invalid start byte',
+                                  s, pos, pos+1)
+            result.append(r)
+
+        elif n == 1:
+            assert 0, "ascii should have gone through the fast path"
+
+        elif n == 2:
+            ordch2 = ord(s[pos+1])
+            if ordch2>>6 != 0b10:
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+1)
                 result.append(r)
                 continue
+            # 110yyyyy 10zzzzzz -> 00000000 00000yyy yyzzzzzz
+            result.append(unichr(((ordch1 & 0b00011111) << 6) +
+                                 (ordch2 & 0b00111111)))
+            pos += 2
 
-        if n == 0:
-            r, pos = errorhandler(errors, "utf-8",
-                                  "invalid start byte",
-                                  s,  pos, pos + 1)
-            result.append(r)
-        elif n == 1:
-            assert 0, "you can never get here"
-        elif n == 2:
-            # 110yyyyy 10zzzzzz   ====>  00000000 00000yyy yyzzzzzz
-
-            ordch2 = ord(s[pos+1])
-            z, two = splitter[6, 2](ordch2)
-            y, six = splitter[5, 3](ordch1)
-            assert six == 6
-            if two != 2:
-                r, pos = errorhandler(errors, "utf-8",
-                                      "invalid continuation byte",
-                                      s,  pos, pos + 1)
-                result.append(r)
-            else:
-                c = (y << 6) + z
-                result.append(unichr(c))
-                pos += n
         elif n == 3:
-            #  1110xxxx 10yyyyyy 10zzzzzz ====> 00000000 xxxxyyyy yyzzzzzz
             ordch2 = ord(s[pos+1])
             ordch3 = ord(s[pos+2])
-            z, two1 = splitter[6, 2](ordch3)
-            y, two2 = splitter[6, 2](ordch2)
-            x, fourteen = splitter[4, 4](ordch1)
-            assert fourteen == 14
-            if (two1 != 2 or two2 != 2 or
+            if (ordch2>>6 != 0b10 or
                 (ordch1 == 0xe0 and ordch2 < 0xa0)
                 # surrogates shouldn't be valid UTF-8!
                 # Uncomment the line below to make them invalid.
                 # or (ordch1 == 0xed and ordch2 > 0x9f)
                 ):
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+1)
+                result.append(r)
+                continue
+            elif ordch3>>6 != 0b10:
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+2)
+                result.append(r)
+                continue
+            # 1110xxxx 10yyyyyy 10zzzzzz -> 00000000 xxxxyyyy yyzzzzzz
+            result.append(unichr(((ordch1 & 0b00001111) << 12) +
+                                 ((ordch2 & 0b00111111) << 6) +
+                                 (ordch3 & 0b00111111)))
+            pos += 3
 
-                # if ordch2 first two bits are 1 and 0, then the invalid
-                # continuation byte is ordch3; else ordch2 is invalid.
-                if two2 == 2:
-                    endpos = pos + 2
-                else:
-                    endpos = pos + 1
-                r, pos = errorhandler(errors, "utf-8",
-                                      "invalid continuation byte",
-                                      s,  pos, endpos)
-                result.append(r)
-            else:
-                c = (x << 12) + (y << 6) + z
-                result.append(unichr(c))
-                pos += n
         elif n == 4:
-            # 11110www 10xxxxxx 10yyyyyy 10zzzzzz ====>
-            # 000wwwxx xxxxyyyy yyzzzzzz
             ordch2 = ord(s[pos+1])
             ordch3 = ord(s[pos+2])
             ordch4 = ord(s[pos+3])
-            z, two1 = splitter[6, 2](ordch4)
-            y, two2 = splitter[6, 2](ordch3)
-            x, two3 = splitter[6, 2](ordch2)
-            w, thirty = splitter[3, 5](ordch1)
-            assert thirty == 30
-            if (two1 != 2 or two2 != 2 or two3 != 2 or
+            if (ordch2>>6 != 0b10 or
                 (ordch1 == 0xf0 and ordch2 < 0x90) or
                 (ordch1 == 0xf4 and ordch2 > 0x8f)):
-                endpos = pos + 1
-                if ordch2 & 0xc0 == 0x80:
-                    endpos += 1
-                    if ordch3 & 0xc0 == 0x80:
-                        endpos += 1
-                r, pos = errorhandler(errors, "utf-8",
-                                      "invalid continuation byte",
-                                      s,  pos, endpos)
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+1)
                 result.append(r)
+                continue
+            elif ordch3>>6 != 0b10:
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+2)
+                result.append(r)
+                continue
+            elif ordch4>>6 != 0b10:
+                r, pos = errorhandler(errors, 'utf-8',
+                                      'invalid continuation byte',
+                                      s, pos, pos+3)
+                result.append(r)
+                continue
+            # 11110www 10xxxxxx 10yyyyyy 10zzzzzz -> 000wwwxx xxxxyyyy yyzzzzzz
+            c = (((ordch1 & 0b00000111) << 18) +
+                 ((ordch2 & 0b00111111) << 12) +
+                 ((ordch3 & 0b00111111) << 6) +
+                 (ordch4 & 0b00111111))
+            if c <= MAXUNICODE:
+                result.append(UNICHR(c))
             else:
-                c = (w << 18) + (x << 12) + (y << 6) + z
-                # convert to UTF-16 if necessary
-                if c <= MAXUNICODE:
-                    result.append(UNICHR(c))
-                else:
-                    # compute and append the two surrogates:
-                    # translate from 10000..10FFFF to 0..FFFF
-                    c -= 0x10000
-                    # high surrogate = top 10 bits added to D800
-                    result.append(unichr(0xD800 + (c >> 10)))
-                    # low surrogate = bottom 10 bits added to DC00
-                    result.append(unichr(0xDC00 + (c & 0x03FF)))
-                pos += n
-        else:
-            r, pos = errorhandler(errors, "utf-8",
-                                  "unsupported Unicode code range",
-                                  s,  pos, pos + n)
-            result.append(r)
+                # compute and append the two surrogates:
+                # translate from 10000..10FFFF to 0..FFFF
+                c -= 0x10000
+                # high surrogate = top 10 bits added to D800
+                result.append(unichr(0xD800 + (c >> 10)))
+                # low surrogate = bottom 10 bits added to DC00
+                result.append(unichr(0xDC00 + (c & 0x03FF)))
+            pos += 4
 
     return result.build(), pos
 
@@ -629,7 +669,7 @@
     3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,
 #  sp   !   "   #   $   %   &   '   (   )   *   +   ,   -   .   /
     2,  1,  1,  1,  1,  1,  1,  0,  0,  0,  1,  3,  0,  0,  0,  0,
-#   0   1   2   3   4   5   6   7   8   9   :   ;   <   =   >   ? 
+#   0   1   2   3   4   5   6   7   8   9   :   ;   <   =   >   ?
     0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  1,  1,  1,  0,
 #   @   A   B   C   D   E   F   G   H   I   J   K   L   M   N   O
     1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
@@ -905,20 +945,20 @@
     pos = 0
     while pos < size:
         ch = p[pos]
-        
+
         if ord(ch) < limit:
             result.append(chr(ord(ch)))
             pos += 1
         else:
             # startpos for collecting unencodable chars
-            collstart = pos 
-            collend = pos+1 
+            collstart = pos
+            collend = pos+1
             while collend < len(p) and ord(p[collend]) >= limit:
                 collend += 1
             r, pos = errorhandler(errors, encoding, reason, p,
                                   collstart, collend)
             result.append(r)
-    
+
     return result.build()
 
 def unicode_encode_latin_1(p, size, errors, errorhandler=None):


More information about the Pypy-commit mailing list