[pypy-svn] r79270 - in pypy/branch/fast-forward/pypy/module/_io: . test
afa at codespeak.net
afa at codespeak.net
Fri Nov 19 13:08:33 CET 2010
Author: afa
Date: Fri Nov 19 13:08:31 2010
New Revision: 79270
Modified:
pypy/branch/fast-forward/pypy/module/_io/__init__.py
pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
Log:
io.IncrementalNewlineDecoder
Modified: pypy/branch/fast-forward/pypy/module/_io/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/__init__.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/__init__.py Fri Nov 19 13:08:31 2010
@@ -24,7 +24,7 @@
'TextIOWrapper': 'interp_textio.W_TextIOWrapper',
'open': 'interp_io.open',
- 'IncrementalNewlineDecoder': 'space.w_None',
+ 'IncrementalNewlineDecoder': 'interp_textio.W_IncrementalNewlineDecoder',
}
def init(self, space):
Modified: pypy/branch/fast-forward/pypy/module/_io/interp_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/interp_textio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/interp_textio.py Fri Nov 19 13:08:31 2010
@@ -3,11 +3,185 @@
TypeDef, GetSetProperty, interp_attrproperty_w, interp_attrproperty,
generic_new_descr)
from pypy.interpreter.gateway import interp2app, unwrap_spec
-from pypy.interpreter.baseobjspace import ObjSpace, W_Root
+from pypy.interpreter.baseobjspace import ObjSpace, Wrappable, W_Root
from pypy.interpreter.error import OperationError, operationerrfmt
+from pypy.rlib.rstring import UnicodeBuilder
STATE_ZERO, STATE_OK, STATE_DETACHED = range(3)
+SEEN_CR = 1
+SEEN_LF = 2
+SEEN_CRLF = 4
+SEEN_ALL = SEEN_CR | SEEN_LF | SEEN_CRLF
+
+class W_IncrementalNewlineDecoder(Wrappable):
+ seennl = 0
+ pendingcr = False
+ w_decoder = None
+
+ def __init__(self, space):
+ pass
+
+ @unwrap_spec('self', ObjSpace, W_Root, int, W_Root)
+ def descr_init(self, space, w_decoder, translate, w_errors=None):
+ self.w_decoder = w_decoder
+ self.translate = translate
+ if space.is_w(w_errors, space.w_None):
+ self.w_errors = space.wrap("strict")
+ else:
+ self.w_errors = w_errors
+
+ self.seennl = 0
+ pendingcr = False
+
+ def newlines_get_w(space, self):
+ return {
+ SEEN_CR: space.wrap("\r"),
+ SEEN_LF: space.wrap("\n"),
+ SEEN_CRLF: space.wrap("\r\n"),
+ SEEN_CR | SEEN_LF: space.wrap(("\r", "\n")),
+ SEEN_CR | SEEN_CRLF: space.wrap(("\r", "\r\n")),
+ SEEN_LF | SEEN_CRLF: space.wrap(("\n", "\r\n")),
+ SEEN_CR | SEEN_LF | SEEN_CRLF: space.wrap(("\r", "\n", "\r\n")),
+ }.get(self.seennl)
+
+ @unwrap_spec('self', ObjSpace, W_Root, int)
+ def decode_w(self, space, w_input, final=False):
+ if self.w_decoder is None:
+ raise OperationError(space.w_ValueError, space.wrap(
+ "IncrementalNewlineDecoder.__init__ not called"))
+
+ # decode input (with the eventual \r from a previous pass)
+ if not space.is_w(self.w_decoder, space.w_None):
+ w_output = space.call_method(self.w_decoder, "decode",
+ w_input, space.wrap(final))
+ else:
+ w_output = w_input
+
+ if not space.isinstance_w(w_output, space.w_unicode):
+ raise OperationError(space.w_TypeError, space.wrap(
+ "decoder should return a string result"))
+
+ output = space.unicode_w(w_output)
+ output_len = len(output)
+ if self.pendingcr and (final or output_len):
+ output = u'\r' + output
+ self.pendingcr = False
+ output_len += 1
+
+ # retain last \r even when not translating data:
+ # then readline() is sure to get \r\n in one pass
+ if not final and output_len > 0:
+ last = output_len - 1
+ assert last >= 0
+ if output[last] == u'\r':
+ output = output[:last]
+ self.pendingcr = True
+ output_len -= 1
+
+ if output_len == 0:
+ return space.wrap(u"")
+
+ # Record which newlines are read and do newline translation if
+ # desired, all in one pass.
+ seennl = self.seennl
+
+ # If, up to now, newlines are consistently \n, do a quick check
+ # for the \r
+ only_lf = False
+ if seennl == SEEN_LF or seennl == 0:
+ only_lf = (output.find(u'\r') < 0)
+
+ if only_lf:
+ # If not already seen, quick scan for a possible "\n" character.
+ # (there's nothing else to be done, even when in translation mode)
+ if seennl == 0 and output.find(u'\n') >= 0:
+ seennl |= SEEN_LF
+ # Finished: we have scanned for newlines, and none of them
+ # need translating.
+ elif not self.translate:
+ i = 0
+ while i < output_len:
+ if seennl == SEEN_ALL:
+ break
+ c = output[i]
+ i += 1
+ if c == u'\n':
+ seennl |= SEEN_LF
+ elif c == u'\r':
+ if i < output_len and output[i] == u'\n':
+ seennl |= SEEN_CRLF
+ i += 1
+ else:
+ seennl |= SEEN_CR
+ elif output.find(u'\r') >= 0:
+ # Translate!
+ builder = UnicodeBuilder(output_len)
+ i = 0
+ while i < output_len:
+ c = output[i]
+ i += 1
+ if c == u'\n':
+ seennl |= SEEN_LF
+ elif c == u'\r':
+ if i < output_len and output[i] == u'\n':
+ seennl |= SEEN_CRLF
+ i += 1
+ else:
+ seennl |= SEEN_CR
+ builder.append(u'\n')
+ continue
+ builder.append(c)
+ output = builder.build()
+
+ self.seennl |= seennl
+ return space.wrap(output)
+
+ @unwrap_spec('self', ObjSpace)
+ def reset_w(self, space):
+ self.seennl = 0
+ self.pendingcr = False
+ if self.w_decoder and not space.is_w(self.w_decoder, space.w_None):
+ space.call_method(self.w_decoder, "reset")
+
+ @unwrap_spec('self', ObjSpace)
+ def getstate_w(self, space):
+ if self.w_decoder and not space.is_w(self.w_decoder, space.w_None):
+ w_state = space.call_method(self.w_decoder, "getstate")
+ w_buffer, w_flag = space.unpackiterable(w_state, 2)
+ flag = space.r_longlong_w(w_flag)
+ else:
+ w_buffer = space.wrap("")
+ flag = 0
+ flag <<= 1
+ if self.pendingcr:
+ flag |= 1
+ return space.newtuple([w_buffer, space.wrap(flag)])
+
+ @unwrap_spec('self', ObjSpace, W_Root)
+ def setstate_w(self, space, w_state):
+ w_buffer, w_flag = space.unpackiterable(w_state, 2)
+ flag = space.r_longlong_w(w_flag)
+ self.pendingcr = (flag & 1)
+ flag >>= 1
+
+ if self.w_decoder and not space.is_w(self.w_decoder, space.w_None):
+ w_state = space.newtuple([w_buffer, space.wrap(flag)])
+ space.call_method(self.w_decoder, "setstate", w_state)
+
+W_IncrementalNewlineDecoder.typedef = TypeDef(
+ 'TextIOWrapper',
+ __new__ = generic_new_descr(W_IncrementalNewlineDecoder),
+ __init__ = interp2app(W_IncrementalNewlineDecoder.descr_init),
+
+ decode = interp2app(W_IncrementalNewlineDecoder.decode_w),
+ reset = interp2app(W_IncrementalNewlineDecoder.reset_w),
+ getstate = interp2app(W_IncrementalNewlineDecoder.getstate_w),
+ setstate = interp2app(W_IncrementalNewlineDecoder.setstate_w),
+
+ newlines = GetSetProperty(W_IncrementalNewlineDecoder.newlines_get_w),
+ )
+
class W_TextIOBase(W_IOBase):
w_encoding = None
Modified: pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py (original)
+++ pypy/branch/fast-forward/pypy/module/_io/test/test_textio.py Fri Nov 19 13:08:31 2010
@@ -28,3 +28,112 @@
t = _io.TextIOWrapper(b)
assert t.readable()
assert t.seekable()
+
+class AppTestIncrementalNewlineDecoder:
+
+ def test_newline_decoder(self):
+ import _io
+ def check_newline_decoding_utf8(decoder):
+ # UTF-8 specific tests for a newline decoder
+ def _check_decode(b, s, **kwargs):
+ # We exercise getstate() / setstate() as well as decode()
+ state = decoder.getstate()
+ assert decoder.decode(b, **kwargs) == s
+ decoder.setstate(state)
+ assert decoder.decode(b, **kwargs) == s
+
+ _check_decode(b'\xe8\xa2\x88', u"\u8888")
+
+ _check_decode(b'\xe8', "")
+ _check_decode(b'\xa2', "")
+ _check_decode(b'\x88', u"\u8888")
+
+ _check_decode(b'\xe8', "")
+ _check_decode(b'\xa2', "")
+ _check_decode(b'\x88', u"\u8888")
+
+ _check_decode(b'\xe8', "")
+ raises(UnicodeDecodeError, decoder.decode, b'', final=True)
+
+ decoder.reset()
+ _check_decode(b'\n', "\n")
+ _check_decode(b'\r', "")
+ _check_decode(b'', "\n", final=True)
+ _check_decode(b'\r', "\n", final=True)
+
+ _check_decode(b'\r', "")
+ _check_decode(b'a', "\na")
+
+ _check_decode(b'\r\r\n', "\n\n")
+ _check_decode(b'\r', "")
+ _check_decode(b'\r', "\n")
+ _check_decode(b'\na', "\na")
+
+ _check_decode(b'\xe8\xa2\x88\r\n', u"\u8888\n")
+ _check_decode(b'\xe8\xa2\x88', u"\u8888")
+ _check_decode(b'\n', "\n")
+ _check_decode(b'\xe8\xa2\x88\r', u"\u8888")
+ _check_decode(b'\n', "\n")
+
+ def check_newline_decoding(decoder, encoding):
+ result = []
+ if encoding is not None:
+ encoder = codecs.getincrementalencoder(encoding)()
+ def _decode_bytewise(s):
+ # Decode one byte at a time
+ for b in encoder.encode(s):
+ result.append(decoder.decode(b))
+ else:
+ encoder = None
+ def _decode_bytewise(s):
+ # Decode one char at a time
+ for c in s:
+ result.append(decoder.decode(c))
+ assert decoder.newlines == None
+ _decode_bytewise(u"abc\n\r")
+ assert decoder.newlines == '\n'
+ _decode_bytewise(u"\nabc")
+ assert decoder.newlines == ('\n', '\r\n')
+ _decode_bytewise(u"abc\r")
+ assert decoder.newlines == ('\n', '\r\n')
+ _decode_bytewise(u"abc")
+ assert decoder.newlines == ('\r', '\n', '\r\n')
+ _decode_bytewise(u"abc\r")
+ assert "".join(result) == "abc\n\nabcabc\nabcabc"
+ decoder.reset()
+ input = u"abc"
+ if encoder is not None:
+ encoder.reset()
+ input = encoder.encode(input)
+ assert decoder.decode(input) == "abc"
+ assert decoder.newlines is None
+
+ encodings = (
+ # None meaning the IncrementalNewlineDecoder takes unicode input
+ # rather than bytes input
+ None, 'utf-8', 'latin-1',
+ 'utf-16', 'utf-16-le', 'utf-16-be',
+ 'utf-32', 'utf-32-le', 'utf-32-be',
+ )
+ import codecs
+ for enc in encodings:
+ decoder = enc and codecs.getincrementaldecoder(enc)()
+ decoder = _io.IncrementalNewlineDecoder(decoder, translate=True)
+ check_newline_decoding(decoder, enc)
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = _io.IncrementalNewlineDecoder(decoder, translate=True)
+ check_newline_decoding_utf8(decoder)
+
+ def test_newline_bytes(self):
+ import _io
+ # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
+ def _check(dec):
+ assert dec.newlines is None
+ assert dec.decode(u"\u0D00") == u"\u0D00"
+ assert dec.newlines is None
+ assert dec.decode(u"\u0A00") == u"\u0A00"
+ assert dec.newlines is None
+ dec = _io.IncrementalNewlineDecoder(None, translate=False)
+ _check(dec)
+ dec = _io.IncrementalNewlineDecoder(None, translate=True)
+ _check(dec)
More information about the Pypy-commit
mailing list