[pypy-svn] pypy default: Added newline support to string io.

alex_gaynor commits-noreply at bitbucket.org
Thu Feb 3 23:13:03 CET 2011


Author: Alex Gaynor <alex.gaynor at gmail.com>
Branch: 
Changeset: r41588:b1399d25389c
Date: 2011-02-03 17:12 -0500
http://bitbucket.org/pypy/pypy/changeset/b1399d25389c/

Log:	Added newline support to string io.

diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -182,7 +182,7 @@
             space.call_method(self.w_decoder, "setstate", w_state)
 
 W_IncrementalNewlineDecoder.typedef = TypeDef(
-    'TextIOWrapper',
+    'IncrementalNewlineDecoder',
     __new__ = generic_new_descr(W_IncrementalNewlineDecoder),
     __init__  = interp2app(W_IncrementalNewlineDecoder.descr_init),
 
@@ -224,6 +224,49 @@
     def errors_get_w(space, self):
         return space.w_None
 
+
+    def _find_line_ending(self, line, start, end):
+        size = end - start
+        if self.readtranslate:
+
+            # Newlines are already translated, only search for \n
+            pos = line.find(u'\n', start, end)
+            if pos >= 0:
+                return pos - start + 1, 0
+            else:
+                return -1, size
+        elif self.readuniversal:
+            # Universal newline search. Find any of \r, \r\n, \n
+            # The decoder ensures that \r\n are not split in two pieces
+            i = 0
+            while True:
+                # Fast path for non-control chars. The loop always ends
+                # since the Py_UNICODE storage is NUL-terminated.
+                while i < size and line[start + i] > '\r':
+                    i += 1
+                if i >= size:
+                    return -1, size
+                ch = line[start + i]
+                i += 1
+                if ch == '\n':
+                    return i, 0
+                if ch == '\r':
+                    if line[start + i] == '\n':
+                        return i + 1, 0
+                    else:
+                        return i, 0
+        else:
+            # Non-universal mode.
+            pos = line.find(self.readnl, start, end)
+            if pos >= 0:
+                return pos - start + len(self.readnl), 0
+            else:
+                pos = line.find(self.readnl[0], start, end)
+                if pos >= 0:
+                    return -1, pos - start
+                return -1, size
+
+
 W_TextIOBase.typedef = TypeDef(
     '_TextIOBase', W_IOBase.typedef,
     __new__ = generic_new_descr(W_TextIOBase),
@@ -583,48 +626,6 @@
 
         return space.wrap(builder.build())
 
-    def _find_line_ending(self, line, start):
-        end = len(line)
-        size = end - start
-        if self.readtranslate:
-
-            # Newlines are already translated, only search for \n
-            pos = line.find(u'\n', start, end)
-            if pos >= 0:
-                return pos - start + 1, 0
-            else:
-                return -1, size
-        elif self.readuniversal:
-            # Universal newline search. Find any of \r, \r\n, \n
-            # The decoder ensures that \r\n are not split in two pieces
-            i = 0
-            while True:
-                # Fast path for non-control chars. The loop always ends
-                # since the Py_UNICODE storage is NUL-terminated.
-                while i < size and line[start + i] > '\r':
-                    i += 1
-                if i >= size:
-                    return -1, size
-                ch = line[start + i]
-                i += 1
-                if ch == '\n':
-                    return i, 0
-                if ch == '\r':
-                    if line[start + i] == '\n':
-                        return i + 1, 0
-                    else:
-                        return i, 0
-        else:
-            # Non-universal mode.
-            pos = line.find(self.readnl, start, end)
-            if pos >= 0:
-                return pos - start + len(self.readnl), 0
-            else:
-                pos = line.find(self.readnl[0], start, end)
-                if pos >= 0:
-                    return -1, pos - start
-                return -1, size
-
     @unwrap_spec('self', ObjSpace, W_Root)
     def readline_w(self, space, w_limit=None):
         self._check_closed(space)
@@ -663,7 +664,7 @@
                 remaining = None
 
             line_len = len(line)
-            endpos, consumed = self._find_line_ending(line, start)
+            endpos, consumed = self._find_line_ending(line, start, line_len)
             if endpos >= 0:
                 endpos += start
                 if limit >= 0 and endpos >= start + limit - chunked:

diff --git a/pypy/module/_io/test/test_stringio.py b/pypy/module/_io/test/test_stringio.py
--- a/pypy/module/_io/test/test_stringio.py
+++ b/pypy/module/_io/test/test_stringio.py
@@ -143,3 +143,87 @@
         import io
 
         assert io.StringIO.__module__ == "_io"
+
+    def test_newline_none(self):
+        import io
+
+        sio = io.StringIO(u"a\nb\r\nc\rd", newline=None)
+        res = list(sio)
+        assert res == [u"a\n", u"b\n", u"c\n", u"d"]
+        sio.seek(0)
+        res = sio.read(1)
+        assert res == u"a"
+        res = sio.read(2)
+        assert res == u"\nb"
+        res = sio.read(2)
+        assert res == u"\nc"
+        res = sio.read(1)
+        assert res == u"\n"
+
+        sio = io.StringIO(newline=None)
+        res = sio.write(u"a\n")
+        assert res == 2
+        res = sio.write(u"b\r\n")
+        assert res == 3
+        res = sio.write(u"c\rd")
+        assert res == 3
+        sio.seek(0)
+        res = sio.read()
+        assert res == u"a\nb\nc\nd"
+        sio = io.StringIO(u"a\r\nb", newline=None)
+        res = sio.read(3)
+        assert res == u"a\nb"
+
+    def test_newline_empty(self):
+        import io
+
+        sio = io.StringIO(u"a\nb\r\nc\rd", newline="")
+        res = list(sio)
+        assert res == [u"a\n", u"b\r\n", u"c\r", u"d"]
+        sio.seek(0)
+        res = sio.read(4)
+        assert res == u"a\nb\r"
+        res = sio.read(2)
+        assert res == u"\nc"
+        res = sio.read(1)
+        assert res == u"\r"
+
+        sio = io.StringIO(newline="")
+        res = sio.write(u"a\n")
+        assert res == 2
+        res = sio.write(u"b\r")
+        assert res == 2
+        res = sio.write(u"\nc")
+        assert res == 2
+        res = sio.write(u"\rd")
+        assert res == 2
+        sio.seek(0)
+        res = list(sio)
+        assert res == [u"a\n", u"b\r\n", u"c\r", u"d"]
+
+    def test_newline_lf(self):
+        import io
+
+        sio = io.StringIO(u"a\nb\r\nc\rd")
+        res = list(sio)
+        assert res == [u"a\n", u"b\r\n", u"c\rd"]
+
+    def test_newline_cr(self):
+        import io
+
+        sio = io.StringIO(u"a\nb\r\nc\rd", newline="\r")
+        res = sio.read()
+        assert res == u"a\rb\r\rc\rd"
+        sio.seek(0)
+        res = list(sio)
+        assert res == [u"a\r", u"b\r", u"\r", u"c\r", u"d"]
+
+    def test_newline_crlf(self):
+        import io
+
+        sio = io.StringIO(u"a\nb\r\nc\rd", newline="\r\n")
+        res = sio.read()
+        assert res == u"a\r\nb\r\r\nc\rd"
+        sio.seek(0)
+        res = list(sio)
+        assert res == [u"a\r\n", u"b\r\r\n", u"c\rd"]

diff --git a/pypy/module/_io/interp_stringio.py b/pypy/module/_io/interp_stringio.py
--- a/pypy/module/_io/interp_stringio.py
+++ b/pypy/module/_io/interp_stringio.py
@@ -3,7 +3,7 @@
 from pypy.interpreter.gateway import interp2app, unwrap_spec
 from pypy.interpreter.error import OperationError, operationerrfmt
 from pypy.interpreter.baseobjspace import ObjSpace, W_Root
-from pypy.module._io.interp_textio import W_TextIOBase
+from pypy.module._io.interp_textio import W_TextIOBase, W_IncrementalNewlineDecoder
 from pypy.module._io.interp_iobase import convert_size
 
 
@@ -13,11 +13,32 @@
         self.buf = []
         self.pos = 0
 
-    @unwrap_spec('self', ObjSpace, W_Root)
-    def descr_init(self, space, w_initvalue=None):
+    @unwrap_spec('self', ObjSpace, W_Root, "str_or_None")
+    def descr_init(self, space, w_initvalue=None, newline="\n"):
         # In case __init__ is called multiple times
         self.buf = []
         self.pos = 0
+        self.w_decoder = None
+        self.readnl = None
+        self.writenl = None
+
+        if (newline is not None and newline != "" and newline != "\n" and
+            newline != "\r" and newline != "\r\n"):
+            raise operationerrfmt(space.w_ValueError,
+                "illegal newline value: %s", newline
+            )
+        if newline is not None:
+            self.readnl = newline
+        self.readuniversal = newline is None or newline == ""
+        self.readtranslate = newline is None
+        if newline and newline[0] == "\r":
+            self.writenl = newline
+        if self.readuniversal:
+            self.w_decoder = space.call_function(
+                space.gettypefor(W_IncrementalNewlineDecoder),
+                space.w_None,
+                space.wrap(int(self.readtranslate))
+            )
 
         if not space.is_w(w_initvalue, space.w_None):
             self.write_w(space, w_initvalue)
@@ -55,11 +76,27 @@
                                   "string argument expected, got '%s'",
                                   space.type(w_obj).getname(space, '?'))
         self._check_closed(space)
-        string = space.unicode_w(w_obj)
+
+        orig_size = space.int_w(space.len(w_obj))
+
+        if self.w_decoder is not None:
+            w_decoded = space.call_method(
+                self.w_decoder, "decode", w_obj, space.w_True
+            )
+        else:
+            w_decoded = w_obj
+
+        if self.writenl:
+            w_decoded = space.call_method(
+                w_decoded, "replace", space.wrap("\n"), space.wrap(self.writenl)
+            )
+
+        string = space.unicode_w(w_decoded)
         size = len(string)
+
         if size:
             self.write(string)
-        return space.wrap(size)
+        return space.wrap(orig_size)
 
     @unwrap_spec('self', ObjSpace, W_Root)
     def read_w(self, space, w_size=None):
@@ -77,6 +114,30 @@
         self.pos = end
         return space.wrap(u''.join(self.buf[start:end]))
 
+    @unwrap_spec('self', ObjSpace, int)
+    def readline_w(self, space, limit=-1):
+        if self.pos >= len(self.buf):
+            return space.wrap(u"")
+
+        start = self.pos
+        if limit < 0 or limit > len(self.buf) - self.pos:
+            limit = len(self.buf) - self.pos
+
+        end = start + limit
+
+        endpos, consumed = self._find_line_ending(
+            # XXX: super inefficient, makes a copy of the entire contents.
+            "".join(self.buf),
+            start,
+            end
+        )
+        if endpos >= 0:
+            endpos += start
+        else:
+            endpos = end
+        self.pos = endpos
+        return space.wrap("".join(self.buf[start:endpos]))
+
     @unwrap_spec('self', ObjSpace, int, int)
     def seek_w(self, space, pos, mode=0):
         self._check_closed(space)
@@ -149,6 +210,7 @@
     def line_buffering_get_w(space, self):
         return space.w_False
 
+
 W_StringIO.typedef = TypeDef(
     'StringIO', W_TextIOBase.typedef,
     __module__ = "_io",
@@ -156,6 +218,7 @@
     __init__ = interp2app(W_StringIO.descr_init),
     write = interp2app(W_StringIO.write_w),
     read = interp2app(W_StringIO.read_w),
+    readline = interp2app(W_StringIO.readline_w),
     seek = interp2app(W_StringIO.seek_w),
     truncate = interp2app(W_StringIO.truncate_w),
     getvalue = interp2app(W_StringIO.getvalue_w),


More information about the Pypy-commit mailing list