[pypy-svn] r79270 - in pypy/branch/fast-forward/pypy/module/_io: . test

afa at codespeak.net afa at codespeak.net
Fri Nov 19 13:08:33 CET 2010


Author: afa
Date: Fri Nov 19 13:08:31 2010
New Revision: 79270

Modified:
   pypy/branch/fast-forward/pypy/module/_io/__init__.py
   pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
   pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
Log:
io.IncrementalNewlineDecoder


Modified: pypy/branch/fast-forward/pypy/module/_io/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/__init__.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/__init__.py	Fri Nov 19 13:08:31 2010
@@ -24,7 +24,7 @@
         'TextIOWrapper': 'interp_textio.W_TextIOWrapper',
 
         'open': 'interp_io.open',
-        'IncrementalNewlineDecoder': 'space.w_None',
+        'IncrementalNewlineDecoder': 'interp_textio.W_IncrementalNewlineDecoder',
         }
 
     def init(self, space):

Modified: pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_textio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_textio.py	Fri Nov 19 13:08:31 2010
@@ -3,11 +3,185 @@
     TypeDef, GetSetProperty, interp_attrproperty_w, interp_attrproperty,
     generic_new_descr)
 from pypy.interpreter.gateway import interp2app, unwrap_spec
-from pypy.interpreter.baseobjspace import ObjSpace, W_Root
+from pypy.interpreter.baseobjspace import ObjSpace, Wrappable, W_Root
 from pypy.interpreter.error import OperationError, operationerrfmt
+from pypy.rlib.rstring import UnicodeBuilder
 
 STATE_ZERO, STATE_OK, STATE_DETACHED = range(3)
 
+SEEN_CR   = 1
+SEEN_LF   = 2
+SEEN_CRLF = 4
+SEEN_ALL  = SEEN_CR | SEEN_LF | SEEN_CRLF
+
+class W_IncrementalNewlineDecoder(Wrappable):
+    seennl = 0
+    pendingcr = False
+    w_decoder = None
+
+    def __init__(self, space):
+        pass
+
+    @unwrap_spec('self', ObjSpace, W_Root, int, W_Root)
+    def descr_init(self, space, w_decoder, translate, w_errors=None):
+        self.w_decoder = w_decoder
+        self.translate = translate
+        if space.is_w(w_errors, space.w_None):
+            self.w_errors = space.wrap("strict")
+        else:
+            self.w_errors = w_errors
+
+        self.seennl = 0
+        pendingcr = False
+
+    def newlines_get_w(space, self):
+        return {
+            SEEN_CR: space.wrap("\r"),
+            SEEN_LF: space.wrap("\n"),
+            SEEN_CRLF: space.wrap("\r\n"),
+            SEEN_CR | SEEN_LF: space.wrap(("\r", "\n")),
+            SEEN_CR | SEEN_CRLF: space.wrap(("\r", "\r\n")),
+            SEEN_LF | SEEN_CRLF: space.wrap(("\n", "\r\n")),
+            SEEN_CR | SEEN_LF | SEEN_CRLF: space.wrap(("\r", "\n", "\r\n")),
+            }.get(self.seennl)
+
+    @unwrap_spec('self', ObjSpace, W_Root, int)
+    def decode_w(self, space, w_input, final=False):
+        if self.w_decoder is None:
+            raise OperationError(space.w_ValueError, space.wrap(
+                "IncrementalNewlineDecoder.__init__ not called"))
+
+        # decode input (with the eventual \r from a previous pass)
+        if not space.is_w(self.w_decoder, space.w_None):
+            w_output = space.call_method(self.w_decoder, "decode",
+                                         w_input, space.wrap(final))
+        else:
+            w_output = w_input
+
+        if not space.isinstance_w(w_output, space.w_unicode):
+            raise OperationError(space.w_TypeError, space.wrap(
+                "decoder should return a string result"))
+
+        output = space.unicode_w(w_output)
+        output_len = len(output)
+        if self.pendingcr and (final or output_len):
+            output = u'\r' + output
+            self.pendingcr = False
+            output_len += 1
+
+        # retain last \r even when not translating data:
+        # then readline() is sure to get \r\n in one pass
+        if not final and output_len > 0:
+            last = output_len - 1
+            assert last >= 0
+            if output[last] == u'\r':
+                output = output[:last]
+                self.pendingcr = True
+                output_len -= 1
+
+        if output_len == 0:
+            return space.wrap(u"")
+
+        # Record which newlines are read and do newline translation if
+        # desired, all in one pass.
+        seennl = self.seennl
+
+        # If, up to now, newlines are consistently \n, do a quick check
+        # for the \r
+        only_lf = False
+        if seennl == SEEN_LF or seennl == 0:
+            only_lf = (output.find(u'\r') < 0)
+
+        if only_lf:
+            # If not already seen, quick scan for a possible "\n" character.
+            # (there's nothing else to be done, even when in translation mode)
+            if seennl == 0 and output.find(u'\n') >= 0:
+                seennl |= SEEN_LF
+                # Finished: we have scanned for newlines, and none of them
+                # need translating.
+        elif not self.translate:
+            i = 0
+            while i < output_len:
+                if seennl == SEEN_ALL:
+                    break
+                c = output[i]
+                i += 1
+                if c == u'\n':
+                    seennl |= SEEN_LF
+                elif c == u'\r':
+                    if i < output_len and output[i] == u'\n':
+                        seennl |= SEEN_CRLF
+                        i += 1
+                    else:
+                        seennl |= SEEN_CR
+        elif output.find(u'\r') >= 0:
+            # Translate!
+            builder = UnicodeBuilder(output_len)
+            i = 0
+            while i < output_len:
+                c = output[i]
+                i += 1
+                if c == u'\n':
+                    seennl |= SEEN_LF
+                elif c == u'\r':
+                    if i < output_len and output[i] == u'\n':
+                        seennl |= SEEN_CRLF
+                        i += 1
+                    else:
+                        seennl |= SEEN_CR
+                    builder.append(u'\n')
+                    continue
+                builder.append(c)
+            output = builder.build()
+
+        self.seennl |= seennl
+        return space.wrap(output)
+
+    @unwrap_spec('self', ObjSpace)
+    def reset_w(self, space):
+        self.seennl = 0
+        self.pendingcr = False
+        if self.w_decoder and not space.is_w(self.w_decoder, space.w_None):
+            space.call_method(self.w_decoder, "reset")
+
+    @unwrap_spec('self', ObjSpace)
+    def getstate_w(self, space):
+        if self.w_decoder and not space.is_w(self.w_decoder, space.w_None):
+            w_state = space.call_method(self.w_decoder, "getstate")
+            w_buffer, w_flag = space.unpackiterable(w_state, 2)
+            flag = space.r_longlong_w(w_flag)
+        else:
+            w_buffer = space.wrap("")
+            flag = 0
+        flag <<= 1
+        if self.pendingcr:
+            flag |= 1
+        return space.newtuple([w_buffer, space.wrap(flag)])
+
+    @unwrap_spec('self', ObjSpace, W_Root)
+    def setstate_w(self, space, w_state):
+        w_buffer, w_flag = space.unpackiterable(w_state, 2)
+        flag = space.r_longlong_w(w_flag)
+        self.pendingcr = (flag & 1)
+        flag >>= 1
+
+        if self.w_decoder and not space.is_w(self.w_decoder, space.w_None):
+            w_state = space.newtuple([w_buffer, space.wrap(flag)])
+            space.call_method(self.w_decoder, "setstate", w_state)
+
+W_IncrementalNewlineDecoder.typedef = TypeDef(
+    'TextIOWrapper',
+    __new__ = generic_new_descr(W_IncrementalNewlineDecoder),
+    __init__  = interp2app(W_IncrementalNewlineDecoder.descr_init),
+
+    decode = interp2app(W_IncrementalNewlineDecoder.decode_w),
+    reset = interp2app(W_IncrementalNewlineDecoder.reset_w),
+    getstate = interp2app(W_IncrementalNewlineDecoder.getstate_w),
+    setstate = interp2app(W_IncrementalNewlineDecoder.setstate_w),
+
+    newlines = GetSetProperty(W_IncrementalNewlineDecoder.newlines_get_w),
+    )
+
 class W_TextIOBase(W_IOBase):
     w_encoding = None
 

Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py	Fri Nov 19 13:08:31 2010
@@ -28,3 +28,112 @@
         t = _io.TextIOWrapper(b)
         assert t.readable()
         assert t.seekable()
+
+class AppTestIncrementalNewlineDecoder:
+
+    def test_newline_decoder(self):
+        import _io
+        def check_newline_decoding_utf8(decoder):
+            # UTF-8 specific tests for a newline decoder
+            def _check_decode(b, s, **kwargs):
+                # We exercise getstate() / setstate() as well as decode()
+                state = decoder.getstate()
+                assert decoder.decode(b, **kwargs) == s
+                decoder.setstate(state)
+                assert decoder.decode(b, **kwargs) == s
+
+            _check_decode(b'\xe8\xa2\x88', u"\u8888")
+
+            _check_decode(b'\xe8', "")
+            _check_decode(b'\xa2', "")
+            _check_decode(b'\x88', u"\u8888")
+
+            _check_decode(b'\xe8', "")
+            _check_decode(b'\xa2', "")
+            _check_decode(b'\x88', u"\u8888")
+
+            _check_decode(b'\xe8', "")
+            raises(UnicodeDecodeError, decoder.decode, b'', final=True)
+
+            decoder.reset()
+            _check_decode(b'\n', "\n")
+            _check_decode(b'\r', "")
+            _check_decode(b'', "\n", final=True)
+            _check_decode(b'\r', "\n", final=True)
+
+            _check_decode(b'\r', "")
+            _check_decode(b'a', "\na")
+
+            _check_decode(b'\r\r\n', "\n\n")
+            _check_decode(b'\r', "")
+            _check_decode(b'\r', "\n")
+            _check_decode(b'\na', "\na")
+
+            _check_decode(b'\xe8\xa2\x88\r\n', u"\u8888\n")
+            _check_decode(b'\xe8\xa2\x88', u"\u8888")
+            _check_decode(b'\n', "\n")
+            _check_decode(b'\xe8\xa2\x88\r', u"\u8888")
+            _check_decode(b'\n', "\n")
+
+        def check_newline_decoding(decoder, encoding):
+            result = []
+            if encoding is not None:
+                encoder = codecs.getincrementalencoder(encoding)()
+                def _decode_bytewise(s):
+                    # Decode one byte at a time
+                    for b in encoder.encode(s):
+                        result.append(decoder.decode(b))
+            else:
+                encoder = None
+                def _decode_bytewise(s):
+                    # Decode one char at a time
+                    for c in s:
+                        result.append(decoder.decode(c))
+            assert decoder.newlines == None
+            _decode_bytewise(u"abc\n\r")
+            assert decoder.newlines == '\n'
+            _decode_bytewise(u"\nabc")
+            assert decoder.newlines == ('\n', '\r\n')
+            _decode_bytewise(u"abc\r")
+            assert decoder.newlines == ('\n', '\r\n')
+            _decode_bytewise(u"abc")
+            assert decoder.newlines == ('\r', '\n', '\r\n')
+            _decode_bytewise(u"abc\r")
+            assert "".join(result) == "abc\n\nabcabc\nabcabc"
+            decoder.reset()
+            input = u"abc"
+            if encoder is not None:
+                encoder.reset()
+                input = encoder.encode(input)
+            assert decoder.decode(input) == "abc"
+            assert decoder.newlines is None
+
+        encodings = (
+            # None meaning the IncrementalNewlineDecoder takes unicode input
+            # rather than bytes input
+            None, 'utf-8', 'latin-1',
+            'utf-16', 'utf-16-le', 'utf-16-be',
+            'utf-32', 'utf-32-le', 'utf-32-be',
+        )
+        import codecs
+        for enc in encodings:
+            decoder = enc and codecs.getincrementaldecoder(enc)()
+            decoder = _io.IncrementalNewlineDecoder(decoder, translate=True)
+            check_newline_decoding(decoder, enc)
+        decoder = codecs.getincrementaldecoder("utf-8")()
+        decoder = _io.IncrementalNewlineDecoder(decoder, translate=True)
+        check_newline_decoding_utf8(decoder)
+
+    def test_newline_bytes(self):
+        import _io
+        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
+        def _check(dec):
+            assert dec.newlines is None
+            assert dec.decode(u"\u0D00") == u"\u0D00"
+            assert dec.newlines is None
+            assert dec.decode(u"\u0A00") == u"\u0A00"
+            assert dec.newlines is None
+        dec = _io.IncrementalNewlineDecoder(None, translate=False)
+        _check(dec)
+        dec = _io.IncrementalNewlineDecoder(None, translate=True)
+        _check(dec)



More information about the Pypy-commit mailing list