[Python-3000-checkins] r57186 - in python/branches/py3k/Lib: io.py test/test_io.py test/test_univnewlines.py

guido.van.rossum python-3000-checkins at python.org
Sat Aug 18 23:39:59 CEST 2007


Author: guido.van.rossum
Date: Sat Aug 18 23:39:55 2007
New Revision: 57186

Modified:
   python/branches/py3k/Lib/io.py
   python/branches/py3k/Lib/test/test_io.py
   python/branches/py3k/Lib/test/test_univnewlines.py
Log:
New I/O code from Tony Lownds implement newline feature correctly,
and implements .newlines attribute in a 2.x-compatible fashion.


Modified: python/branches/py3k/Lib/io.py
==============================================================================
--- python/branches/py3k/Lib/io.py	(original)
+++ python/branches/py3k/Lib/io.py	Sat Aug 18 23:39:55 2007
@@ -61,10 +61,26 @@
                  can be: 0 = unbuffered, 1 = line buffered,
                  larger = fully buffered.
       encoding: optional string giving the text encoding.
-      newline: optional newlines specifier; must be None, '\n' or '\r\n';
-               specifies the line ending expected on input and written on
-               output.  If None, use universal newlines on input and
-               use os.linesep on output.
+      newline: optional newlines specifier; must be None, '', '\n', '\r'
+               or '\r\n'; all other values are illegal.  It controls the
+               handling of line endings.  It works as follows:
+
+        * On input, if `newline` is `None`, universal newlines
+          mode is enabled.  Lines in the input can end in `'\n'`,
+          `'\r'`, or `'\r\n'`, and these are translated into
+          `'\n'` before being returned to the caller.  If it is
+          `''`, universal newline mode is enabled, but line endings
+          are returned to the caller untranslated.  If it has any of
+          the other legal values, input lines are only terminated by
+          the given string, and the line ending is returned to the
+          caller untranslated.
+
+        * On output, if `newline` is `None`, any `'\n'`
+          characters written are translated to the system default
+          line separator, `os.linesep`.  If `newline` is `''`,
+          no translation takes place.  If `newline` is any of the
+          other legal values, any `'\n'` characters written are
+          translated to the given string.
 
     (*) If a file descriptor is given, it is closed when the returned
     I/O object is closed.  If you don't want this to happen, use
@@ -958,6 +974,17 @@
         """Subclasses should override."""
         return None
 
+    @property
+    def newlines(self):
+        """newlines -> None | str | tuple of str. Line endings translated
+        so far.
+
+        Only line endings translated during reading are considered.
+
+        Subclasses should override.
+        """
+        return None
+
 
 class TextIOWrapper(TextIOBase):
 
@@ -969,7 +996,7 @@
     _CHUNK_SIZE = 128
 
     def __init__(self, buffer, encoding=None, newline=None):
-        if newline not in (None, "\n", "\r\n"):
+        if newline not in (None, "", "\n", "\r", "\r\n"):
             raise ValueError("illegal newline value: %r" % (newline,))
         if encoding is None:
             try:
@@ -987,8 +1014,12 @@
 
         self.buffer = buffer
         self._encoding = encoding
-        self._newline = newline or os.linesep
-        self._fix_newlines = newline is None
+        self._readuniversal = not newline
+        self._readtranslate = newline is None
+        self._readnl = newline
+        self._writetranslate = newline != ''
+        self._writenl = newline or os.linesep
+        self._seennl = 0
         self._decoder = None
         self._pending = ""
         self._snapshot = None
@@ -1032,13 +1063,15 @@
     def write(self, s: str):
         if self.closed:
             raise ValueError("write to closed file")
+        haslf = "\n" in s
+        if haslf and self._writetranslate and self._writenl != "\n":
+            s = s.replace("\n", self._writenl)
         # XXX What if we were just reading?
         b = s.encode(self._encoding)
         if isinstance(b, str):
             b = bytes(b)
-        n = self.buffer.write(b)
-        if "\n" in s:
-            # XXX only if isatty
+        self.buffer.write(b)
+        if haslf and self.isatty():
             self.flush()
         self._snapshot = self._decoder = None
         return len(s)
@@ -1159,7 +1192,7 @@
             res += decoder.decode(self.buffer.read(), True)
             self._pending = ""
             self._snapshot = None
-            return res.replace("\r\n", "\n")
+            return self._replacenl(res)
         else:
             while len(res) < n:
                 readahead, pending = self._read_chunk()
@@ -1167,7 +1200,7 @@
                 if not readahead:
                     break
             self._pending = res[n:]
-            return res[:n].replace("\r\n", "\n")
+            return self._replacenl(res[:n])
 
     def __next__(self):
         self._telling = False
@@ -1189,59 +1222,136 @@
 
         line = self._pending
         start = 0
+        cr_eof = False
         decoder = self._decoder or self._get_decoder()
 
+        pos = endpos = None
+        ending = None
         while True:
-            # In C we'd look for these in parallel of course.
-            nlpos = line.find("\n", start)
-            crpos = line.find("\r", start)
-            if nlpos >= 0 and crpos >= 0:
-                endpos = min(nlpos, crpos)
-            else:
-                endpos = nlpos if nlpos >= 0 else crpos
-
-            if endpos != -1:
-                endc = line[endpos]
-                if endc == "\n":
-                    ending = "\n"
-                    break
+            if self._readuniversal:
+                # Universal newline search. Find any of \r, \r\n, \n
 
-                # We've seen \r - is it standalone, \r\n or \r at end of line?
-                if endpos + 1 < len(line):
-                    if line[endpos+1] == "\n":
-                        ending = "\r\n"
+                # In C we'd look for these in parallel of course.
+                nlpos = line.find("\n", start)
+                crpos = line.find("\r", start)
+                if crpos == -1:
+                    if nlpos == -1:
+                        start = len(line)
                     else:
-                        ending = "\r"
+                        # Found \n
+                        pos = nlpos
+                        endpos = pos + 1
+                        ending = self._LF
+                        break
+                elif nlpos == -1:
+                    if crpos == len(line) - 1:
+                        # Found \r at end of buffer, must keep reading
+                        start = crpos
+                        cr_eof = True
+                    else:
+                        # Found lone \r
+                        ending = self._CR
+                        pos = crpos
+                        endpos = pos + 1
+                        break
+                elif nlpos < crpos:
+                    # Found \n
+                    pos = nlpos
+                    endpos = pos + 1
+                    ending = self._LF
+                    break
+                elif nlpos == crpos + 1:
+                    # Found \r\n
+                    ending = self._CRLF
+                    pos = crpos
+                    endpos = pos + 2
+                    break
+                else:
+                    # Found \r
+                    pos = crpos
+                    endpos = pos + 1
+                    ending = self._CR
                     break
-                # There might be a following \n in the next block of data ...
-                start = endpos
             else:
-                start = len(line)
+                # non-universal
+                pos = line.find(self._readnl)
+                if pos >= 0:
+                    endpos = pos+len(self._readnl)
+                    ending = self._nlflag(self._readnl)
+                    break
 
             # No line ending seen yet - get more data
+            more_line = ''
             while True:
                 readahead, pending = self._read_chunk()
                 more_line = pending
                 if more_line or not readahead:
                     break
+            if more_line:
+                line += more_line
+            else:
+                # end of file
+                self._pending = ''
+                self._snapshot = None
+                if cr_eof:
+                    self._seennl |= self._CR
+                    return line[:-1] + '\n'
+                else:
+                    return line
 
-            if not more_line:
-                ending = ""
-                endpos = len(line)
-                break
-
-            line += more_line
-
-        nextpos = endpos + len(ending)
-        self._pending = line[nextpos:]
-
-        # XXX Update self.newlines here if we want to support that
+        self._pending = line[endpos:]
+        if self._readtranslate:
+            self._seennl |= ending
+            if ending != self._LF:
+                return line[:pos] + '\n'
+            else:
+                return line[:endpos]
+        else:
+            return line[:endpos]
 
-        if self._fix_newlines and ending not in ("\n", ""):
-            return line[:endpos] + "\n"
+    def _replacenl(self, data):
+        # Replace newlines in data as needed and record that they have
+        # been seen.
+        if not self._readtranslate:
+            return data
+        if self._readuniversal:
+            crlf = data.count('\r\n')
+            cr = data.count('\r') - crlf
+            lf = data.count('\n') - crlf
+            self._seennl |= (lf and self._LF) | (cr and self._CR) \
+                         | (crlf and self._CRLF)
+            if crlf:
+                data = data.replace("\r\n", "\n")
+            if cr:
+                data = data.replace("\r", "\n")
+        elif self._readnl == '\n':
+            # Only need to detect if \n was seen.
+            if data.count('\n'):
+                self._seennl |= self._LF
         else:
-            return line[:nextpos]
+            newdata = data.replace(self._readnl, '\n')
+            if newdata is not data:
+                self._seennl |= self._nlflag(self._readnl)
+            data = newdata
+        return data
+
+    _LF = 1
+    _CR = 2
+    _CRLF = 4
+    @property
+    def newlines(self):
+        return (None,
+                "\n",
+                "\r",
+                ("\r", "\n"),
+                "\r\n",
+                ("\n", "\r\n"),
+                ("\r", "\r\n"),
+                ("\r", "\n", "\r\n")
+               )[self._seennl]
 
+    def _nlflag(self, nlstr):
+        return [None, "\n", "\r", None, "\r\n"].index(nlstr)
 
 class StringIO(TextIOWrapper):
 

Modified: python/branches/py3k/Lib/test/test_io.py
==============================================================================
--- python/branches/py3k/Lib/test/test_io.py	(original)
+++ python/branches/py3k/Lib/test/test_io.py	Sat Aug 18 23:39:55 2007
@@ -1,5 +1,6 @@
 """Unit tests for io.py."""
 
+import os
 import sys
 import time
 import array
@@ -481,30 +482,61 @@
     def tearDown(self):
         test_support.unlink(test_support.TESTFN)
 
+    def testNewlinesInput(self):
+        testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+        for newline, expected in [
+            (None, normalized.decode("ASCII").splitlines(True)),
+            ("", testdata.decode("ASCII").splitlines(True)),
+            ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+            ]:
+            buf = io.BytesIO(testdata)
+            txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+            self.assertEquals(txt.readlines(), expected)
+            txt.seek(0)
+            self.assertEquals(txt.read(), "".join(expected))
+
+    def testNewlinesOutput(self):
+        testdict = {
+            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+            }
+        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+        for newline, expected in tests:
+            buf = io.BytesIO()
+            txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+            txt.write("AAA\nB")
+            txt.write("BB\nCCC\n")
+            txt.write("X\rY\r\nZ")
+            txt.flush()
+            self.assertEquals(buf.getvalue(), expected)
+
     def testNewlines(self):
         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
 
         tests = [
             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
-            [ '\n', input_lines ],
-            [ '\r\n', input_lines ],
+            [ '', input_lines ],
+            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
+            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
+            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
         ]
 
         encodings = ('utf-8', 'latin-1')
 
-        # Try a range of pad sizes to test the case where \r is the last
+        # Try a range of buffer sizes to test the case where \r is the last
         # character in TextIOWrapper._pending_line.
         for encoding in encodings:
+            # XXX: str.encode() should return bytes
+            data = bytes(''.join(input_lines).encode(encoding))
             for do_reads in (False, True):
-                for padlen in chain(range(10), range(50, 60)):
-                    pad = '.' * padlen
-                    data_lines = [ pad + line for line in input_lines ]
-                    # XXX: str.encode() should return bytes
-                    data = bytes(''.join(data_lines).encode(encoding))
-
-                    for newline, exp_line_ends in tests:
-                        exp_lines = [ pad + line for line in exp_line_ends ]
-                        bufio = io.BufferedReader(io.BytesIO(data))
+                for bufsize in range(1, 10):
+                    for newline, exp_lines in tests:
+                        bufio = io.BufferedReader(io.BytesIO(data), bufsize)
                         textio = io.TextIOWrapper(bufio, newline=newline,
                                                   encoding=encoding)
                         if do_reads:
@@ -522,6 +554,47 @@
                             self.assertEquals(got_line, exp_line)
                         self.assertEquals(len(got_lines), len(exp_lines))
 
+    def testNewlinesInput(self):
+        testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+        for newline, expected in [
+            (None, normalized.decode("ASCII").splitlines(True)),
+            ("", testdata.decode("ASCII").splitlines(True)),
+            ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+            ]:
+            buf = io.BytesIO(testdata)
+            txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+            self.assertEquals(txt.readlines(), expected)
+            txt.seek(0)
+            self.assertEquals(txt.read(), "".join(expected))
+
+    def testNewlinesOutput(self):
+        import os
+        orig_linesep = os.linesep
+        data = "AAA\nBBB\rCCC\n"
+        data_lf = b"AAA\nBBB\rCCC\n"
+        data_cr = b"AAA\rBBB\rCCC\r"
+        data_crlf = b"AAA\r\nBBB\rCCC\r\n"
+        for os.linesep, newline, expected in [
+            ("\n", None, data_lf),
+            ("\r\n", None, data_crlf),
+            ("\n", "", data_lf),
+            ("\r\n", "", data_lf),
+            ("\n", "\n", data_lf),
+            ("\r\n", "\n", data_lf),
+            ("\n", "\r", data_cr),
+            ("\r\n", "\r", data_cr),
+            ("\n", "\r\n", data_crlf),
+            ("\r\n", "\r\n", data_crlf),
+            ]:
+            buf = io.BytesIO()
+            txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+            txt.write(data)
+            txt.close()
+            self.assertEquals(buf.getvalue(), expected)
+
     # Systematic tests of the text I/O API
 
     def testBasicIO(self):

Modified: python/branches/py3k/Lib/test/test_univnewlines.py
==============================================================================
--- python/branches/py3k/Lib/test/test_univnewlines.py	(original)
+++ python/branches/py3k/Lib/test/test_univnewlines.py	Sat Aug 18 23:39:55 2007
@@ -12,9 +12,8 @@
 
 DATA_TEMPLATE = [
     "line1=1",
-    "line2='this is a very long line designed to go past the magic " +
-        "hundred character limit that is inside fileobject.c and which " +
-        "is meant to speed up the common case, but we also want to test " +
+    "line2='this is a very long line designed to go past any default " +
+        "buffer limits that exist in io.py but we also want to test " +
         "the uncommon case, naturally.'",
     "def line3():pass",
     "line4 = '%s'" % FATX,
@@ -32,7 +31,7 @@
 class TestGenericUnivNewlines(unittest.TestCase):
     # use a class variable DATA to define the data to write to the file
     # and a class variable NEWLINE to set the expected newlines value
-    READMODE = 'U'
+    READMODE = 'r'
     WRITEMODE = 'wb'
 
     def setUp(self):
@@ -79,12 +78,6 @@
         self.assertEqual(data, DATA_SPLIT[1:])
 
 
-class TestNativeNewlines(TestGenericUnivNewlines):
-    NEWLINE = None
-    DATA = DATA_LF
-    READMODE = 'r'
-    WRITEMODE = 'w'
-
 class TestCRNewlines(TestGenericUnivNewlines):
     NEWLINE = '\r'
     DATA = DATA_CR
@@ -104,7 +97,6 @@
 
 def test_main():
     test_support.run_unittest(
-        TestNativeNewlines,
         TestCRNewlines,
         TestLFNewlines,
         TestCRLFNewlines,


More information about the Python-3000-checkins mailing list