[Python-3000-checkins] r54701 - in python/branches/p3yk/Lib: io.py test/test_io.py

guido.van.rossum python-3000-checkins at python.org
Fri Apr 6 19:31:21 CEST 2007


Author: guido.van.rossum
Date: Fri Apr  6 19:31:18 2007
New Revision: 54701

Modified:
   python/branches/p3yk/Lib/io.py
   python/branches/p3yk/Lib/test/test_io.py
Log:
Added a working Text I/O layer, by Mark Russell.
This is essentially a checkpoint.


Modified: python/branches/p3yk/Lib/io.py
==============================================================================
--- python/branches/p3yk/Lib/io.py	(original)
+++ python/branches/p3yk/Lib/io.py	Fri Apr  6 19:31:18 2007
@@ -3,7 +3,7 @@
 This is an early prototype; eventually some of this will be
 reimplemented in C and the rest may be turned into a package.
 
-See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m
+See PEP 3116.
 
 XXX need to default buffer size to 1 if isatty()
 XXX need to support 1 meaning line-buffered
@@ -11,20 +11,24 @@
 """
 
 __author__ = ("Guido van Rossum <guido at python.org>, "
-              "Mike Verdone <mike.verdone at gmail.com>")
+              "Mike Verdone <mike.verdone at gmail.com>, "
+              "Mark Russell <mark.russell at zen.co.uk>")
 
 __all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO",
            "BufferedReader", "BufferedWriter", "BufferedRWPair",
-           "BufferedRandom", "EOF"]
+           "BufferedRandom"]
 
 import os
+import sys
+import codecs
+import warnings
 
 DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
 DEFAULT_MAX_BUFFER_SIZE = 16 * 1024 # bytes
-EOF = b''  # XXX This is wrong because it's mutable
 
 
 class BlockingIO(IOError):
+
     def __init__(self, errno, strerror, characters_written):
         IOError.__init__(self, errno, strerror)
         self.characters_written = characters_written
@@ -125,14 +129,12 @@
     seeked.
 
     The read() method is implemented by calling readinto(); derived
-    classes that want to support readon only need to implement
+    classes that want to support read() only need to implement
     readinto() as a primitive operation.
     """
 
-    # XXX Add individual method docstrings
-
     def read(self, n):
-        """Read and return up to n bytes.
+        """read(n: int) -> bytes.  Read and return up to n bytes.
 
         Returns an empty bytes array on EOF, or None if the object is
         set not to block and has no data to read.
@@ -143,43 +145,80 @@
         return b
 
     def readinto(self, b):
+        """readinto(b: bytes) -> None.  Read up to len(b) bytes into b.
+
+        Returns number of bytes read (0 for EOF), or None if the object
+        is set not to block as has no data to read.
+        """
         raise IOError(".readinto() not supported")
 
     def write(self, b):
-        """Write the given buffer to the IO stream.
+        """write(b: bytes) -> int.  Write the given buffer to the IO stream.
 
-        Returns the number of bytes written.
+        Returns the number of bytes written, which may be less than len(b).
         """
         raise IOError(".write() not supported")
 
     def seek(self, pos, whence=0):
+        """seek(pos: int, whence: int = 0) -> None.  Change stream position.
+
+        Seek to byte offset pos relative to position indicated by whence:
+             0  Start of stream (the default).  pos should be >= 0;
+             1  Current position - whence may be negative;
+             2  End of stream - whence usually negative.
+        """
         raise IOError(".seek() not supported")
 
     def tell(self):
+        """tell() -> int.  Return current stream position."""
         raise IOError(".tell() not supported")
 
     def truncate(self, pos=None):
+        """truncate(size: int = None) -> None. Truncate file to size bytes.
+
+        Size defaults to the current IO position as reported by tell().
+        """
         raise IOError(".truncate() not supported")
 
     def close(self):
+        """close() -> None.  Close IO object."""
         pass
 
     def seekable(self):
+        """seekable() -> bool.  Return whether object supports random access.
+
+        If False, seek(), tell() and truncate() will raise IOError.
+        This method may need to do a test seek().
+        """
         return False
 
     def readable(self):
+        """readable() -> bool.  Return whether object was opened for reading.
+
+        If False, read() will raise IOError.
+        """
         return False
 
     def writable(self):
+        """writable() -> bool.  Return whether object was opened for writing.
+
+        If False, write() and truncate() will raise IOError.
+        """
         return False
 
     def __enter__(self):
+        """Context management protocol.  Returns self."""
         return self
 
     def __exit__(self, *args):
+        """Context management protocol.  Same as close()"""
         self.close()
 
     def fileno(self):
+        """fileno() -> int.  Return underlying file descriptor if there is one.
+
+        Raises IOError if the IO object does not use a file descriptor.
+        """
         raise IOError(".fileno() not supported")
 
 
@@ -252,6 +291,8 @@
     import _fileio
 except ImportError:
     # Let's use the Python version
+    warnings.warn("Can't import _fileio, using slower Python lookalike",
+                  RuntimeWarning)
     FileIO = _PyFileIO
 else:
     # Create a trivial subclass with the proper inheritance structure
@@ -295,17 +336,13 @@
     """XXX Docstring."""
 
 
-class BytesIO(BufferedIOBase):
+class _MemoryBufferMixin:
 
-    """Buffered I/O implementation using a bytes buffer, like StringIO."""
-
-    # XXX More docs
+    # XXX docstring
 
-    def __init__(self, inital_bytes=None):
-        self._buffer = b""
+    def __init__(self, buffer):
+        self._buffer = buffer
         self._pos = 0
-        if inital_bytes is not None:
-            self._buffer += inital_bytes
 
     def getvalue(self):
         return self._buffer
@@ -362,6 +399,35 @@
         return True
 
 
+class BytesIO(_MemoryBufferMixin, BufferedIOBase):
+
+    """Buffered I/O implementation using a bytes buffer, like StringIO."""
+
+    # XXX More docs
+
+    def __init__(self, inital_bytes=None):
+        buffer = b""
+        if inital_bytes is not None:
+            buffer += inital_bytes
+        _MemoryBufferMixin.__init__(self, buffer)
+
+
+class StringIO(_MemoryBufferMixin, BufferedIOBase):
+
+    """Buffered I/O implementation using a string buffer, like StringIO."""
+
+    # XXX More docs
+
+    # XXX Reuses the same code as BytesIO, just with a string rather
+    # that bytes as the _buffer value.  That won't work in C of course.
+
+    def __init__(self, inital_string=None):
+        buffer = ""
+        if inital_string is not None:
+            buffer += inital_string
+        _MemoryBufferMixin.__init__(self, buffer)
+
+
 class BufferedIOBase(RawIOBase):
 
     """Base class for buffered IO objects."""
@@ -375,15 +441,17 @@
 
     """Buffer for a readable sequential RawIO object.
 
-    Does not allow random access (seek, tell).
+    Does not allow random access (seek, tell).  (Use BufferedRandom
+    for that.)
     """
 
-    def __init__(self, raw, unused_buffer_size=None):
+    def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
         """Create a new buffered reader using the given readable raw IO object.
         """
         assert raw.readable()
         self.raw = raw
         self._read_buf = b""
+        self.buffer_size = buffer_size
         if hasattr(raw, 'fileno'):
             self.fileno = raw.fileno
 
@@ -395,11 +463,13 @@
         mode. If n is None, read until EOF or until read() would
         block.
         """
-        assert n is None or n > 0
-        nodata_val = EOF
-        while (len(self._read_buf) < n) if (n is not None) else True:
-            current = self.raw.read(n)
-            if current in (EOF, None):
+        assert n is None or n > 0, '.read(): Bad read size %r' % n
+        nodata_val = b""
+        while n is None or len(self._read_buf) < n:
+            to_read = None if n is None else max(n, self.buffer_size)
+            current = self.raw.read(to_read)
+
+            if current in (b"", None):
                 nodata_val = current
                 break
             self._read_buf += current
@@ -428,6 +498,8 @@
 
 class BufferedWriter(BufferedIOBase):
 
+    # XXX docstring
+
     def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE,
                  max_buffer_size=DEFAULT_MAX_BUFFER_SIZE):
         assert raw.writable()
@@ -488,6 +560,8 @@
 
     A buffered reader object and buffered writer object put together to
     form a sequential IO object that can read and write.
+
+    This is typically used with a socket or two-way pipe.
     """
 
     def __init__(self, reader, writer, buffer_size=DEFAULT_BUFFER_SIZE,
@@ -528,6 +602,8 @@
 
 class BufferedRandom(BufferedReader, BufferedWriter):
 
+    # XXX docstring
+
     def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE,
                  max_buffer_size=DEFAULT_MAX_BUFFER_SIZE):
         assert raw.seekable()
@@ -561,7 +637,9 @@
         return BufferedReader.read(self, n)
 
     def write(self, b):
-        self._read_buf = b""
+        if self._read_buf:
+            self.raw.seek(-len(self._read_buf), 1) # Undo readahead
+            self._read_buf = b""
         return BufferedWriter.write(self, b)
 
     def flush(self):
@@ -569,3 +647,156 @@
 
     def close(self):
         self.raw.close()
+
+
+class TextIOBase(BufferedIOBase):
+
+    """Base class for text I/O.
+
+    This class provides a character and line based interface to stream I/O.
+    """
+
+    def read(self, n: int = -1) -> str:
+        """read(n: int = -1) -> str.  Read at most n characters from stream.
+
+        Read from underlying buffer until we have n characters or we hit EOF.
+        If n is negative or omitted, read until EOF.
+        """
+        raise IOError(".read() not supported")
+
+    def write(self, s: str):
+        """write(s: str) -> None.  Write string s to stream.
+        """
+        raise IOError(".write() not supported")
+
+    def readline(self) -> str:
+        """readline() -> str.  Read until newline or EOF.
+
+        Returns an empty string if EOF is hit immediately.
+        """
+        raise IOError(".readline() not supported")
+
+    def __iter__(self):
+        """__iter__() -> Iterator.  Return line iterator (actually just self).
+        """
+        return self
+
+    def next(self):
+        """Same as readline() except raises StopIteration on immediate EOF.
+        """
+        line = self.readline()
+        if line == '':
+            raise StopIteration
+        return line
+
+
+class TextIOWrapper(TextIOBase):
+
+    """Buffered text stream.
+
+    Character and line based layer over a BufferedIOBase object.
+    """
+
+    # XXX tell(), seek()
+
+    def __init__(self, buffer, encoding=None, newline=None):
+        if newline not in (None, '\n', '\r\n'):
+            raise IOError("illegal newline %s" % newline) # XXX: ValueError?
+        if encoding is None:
+            # XXX This is questionable
+            encoding = sys.getfilesystemencoding()
+            if encoding is None:
+                encoding = "latin-1"  # XXX, but this is best for transparancy
+
+        self.buffer = buffer
+        self._encoding = encoding
+        self._newline = newline or os.linesep
+        self._fix_newlines = newline is None
+        self._decoder = None
+        self._pending = ''
+
+    def write(self, s: str):
+        return self.buffer.write(s.encode(self._encoding))
+
+    def _get_decoder(self):
+        make_decoder = codecs.getincrementaldecoder(self._encoding)
+        if make_decoder is None:
+            raise IOError(".readline() not supported for encoding %s" %
+                          self._encoding)
+        decoder = self._decoder = make_decoder()  # XXX: errors
+        if isinstance(decoder, codecs.BufferedIncrementalDecoder):
+            # XXX Hack: make the codec use bytes instead of strings
+            decoder.buffer = b""
+        return decoder
+
+    def read(self, n: int = -1):
+        decoder = self._decoder or self._get_decoder()
+        res = self._pending
+        if n < 0:
+            res += decoder.decode(self.buffer.read(), True)
+            self._pending = ''
+            return res
+        else:
+            while len(res) < n:
+                data = self.buffer.read(64)
+                res += decoder.decode(data, not data)
+                if not data:
+                    break
+            self._pending = res[n:]
+            return res[:n]
+
+    def readline(self):
+        line = self._pending
+        start = 0
+        decoder = self._decoder or self._get_decoder()
+
+        while True:
+            # In C we'd look for these in parallel of course.
+            nlpos = line.find("\n", start)
+            crpos = line.find("\r", start)
+            if nlpos >= 0 and crpos >= 0:
+                endpos = min(nlpos, crpos)
+            else:
+                endpos = nlpos if nlpos >= 0 else crpos
+
+            if endpos != -1:
+                endc = line[endpos]
+                if endc == "\n":
+                    ending = "\n"
+                    break
+
+                # We've seen \r - is it standalone, \r\n or \r at end of line?
+                if endpos + 1 < len(line):
+                    if line[endpos+1] == '\n':
+                        ending = "\r\n"
+                    else:
+                        ending = "\r"
+                    break
+                # There might be a following \n in the next block of data ...
+                start = endpos
+            else:
+                start = len(line)
+
+            # No line ending seen yet - get more data
+            while True:
+                data = self.buffer.read(64)
+                more_line = decoder.decode(data, not data)
+                if more_line != "" or not data:
+                    break
+
+            if more_line == "":
+                ending = ''
+                endpos = len(line)
+                break
+
+            line += more_line
+
+        nextpos = endpos + len(ending)
+        self._pending = line[nextpos:]
+
+        # XXX Update self.newlines here if we want to support that
+
+        if self._fix_newlines and ending != "\n" and ending != '':
+            return line[:endpos] + "\n"
+        else:
+            return line[:nextpos]

Modified: python/branches/p3yk/Lib/test/test_io.py
==============================================================================
--- python/branches/p3yk/Lib/test/test_io.py	(original)
+++ python/branches/p3yk/Lib/test/test_io.py	Fri Apr  6 19:31:18 2007
@@ -2,7 +2,7 @@
 
 import unittest
 from test import test_support
-
+from itertools import chain
 import io
 
 
@@ -16,7 +16,7 @@
         try:
             return self._readStack.pop(0)
         except:
-            return io.EOF
+            return b""
 
     def write(self, b):
         self._writeStack.append(b)
@@ -41,6 +41,18 @@
         return 42
 
 
+class MockFileIO(io.BytesIO):
+
+    def __init__(self, data):
+        self.read_history = []
+        io.BytesIO.__init__(self, data)
+
+    def read(self, n=None):
+        res = io.BytesIO.read(self, n)
+        self.read_history.append(None if res is None else len(res))
+        return res
+
+
 class MockNonBlockWriterIO(io.RawIOBase):
 
     def __init__(self, blockingScript):
@@ -147,31 +159,31 @@
         f.close()
 
 
-class BytesIOTest(unittest.TestCase):
+class MemorySeekTest(unittest.TestCase):
 
     def testInit(self):
-        buf = b"1234567890"
-        bytesIo = io.BytesIO(buf)
+        buf = self.buftype("1234567890")
+        bytesIo = self.ioclass(buf)
 
     def testRead(self):
-        buf = b"1234567890"
-        bytesIo = io.BytesIO(buf)
+        buf = self.buftype("1234567890")
+        bytesIo = self.ioclass(buf)
 
         self.assertEquals(buf[:1], bytesIo.read(1))
         self.assertEquals(buf[1:5], bytesIo.read(4))
         self.assertEquals(buf[5:], bytesIo.read(900))
-        self.assertEquals(io.EOF, bytesIo.read())
+        self.assertEquals(self.EOF, bytesIo.read())
 
     def testReadNoArgs(self):
-        buf = b"1234567890"
-        bytesIo = io.BytesIO(buf)
+        buf = self.buftype("1234567890")
+        bytesIo = self.ioclass(buf)
 
         self.assertEquals(buf, bytesIo.read())
-        self.assertEquals(io.EOF, bytesIo.read())
+        self.assertEquals(self.EOF, bytesIo.read())
 
     def testSeek(self):
-        buf = b"1234567890"
-        bytesIo = io.BytesIO(buf)
+        buf = self.buftype("1234567890")
+        bytesIo = self.ioclass(buf)
 
         bytesIo.read(5)
         bytesIo.seek(0)
@@ -181,8 +193,8 @@
         self.assertEquals(buf[3:], bytesIo.read())
 
     def testTell(self):
-        buf = b"1234567890"
-        bytesIo = io.BytesIO(buf)
+        buf = self.buftype("1234567890")
+        bytesIo = self.ioclass(buf)
 
         self.assertEquals(0, bytesIo.tell())
         bytesIo.seek(5)
@@ -191,6 +203,18 @@
         self.assertEquals(10000, bytesIo.tell())
 
 
+class BytesIOTest(MemorySeekTest):
+    buftype = bytes
+    ioclass = io.BytesIO
+    EOF = b""
+
+
+class StringIOTest(MemorySeekTest):
+    buftype = str
+    ioclass = io.StringIO
+    EOF = ""
+
+
 class BufferedReaderTest(unittest.TestCase):
 
     def testRead(self):
@@ -199,6 +223,25 @@
 
         self.assertEquals(b"abcdef", bufIo.read(6))
 
+    def testBuffering(self):
+        data = b"abcdefghi"
+        dlen = len(data)
+
+        tests = [
+            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
+            [ 100, [ 3, 3, 3],     [ dlen ]    ],
+            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
+        ]
+
+        for bufsize, buf_read_sizes, raw_read_sizes in tests:
+            rawIo = MockFileIO(data)
+            bufIo = io.BufferedReader(rawIo, buffer_size=bufsize)
+            pos = 0
+            for nbytes in buf_read_sizes:
+                self.assertEquals(bufIo.read(nbytes), data[pos:pos+nbytes])
+                pos += nbytes
+            self.assertEquals(rawIo.read_history, raw_read_sizes)
+
     def testReadNonBlocking(self):
         # Inject some None's in there to simulate EWOULDBLOCK
         rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
@@ -208,7 +251,7 @@
         self.assertEquals(b"e", bufIo.read(1))
         self.assertEquals(b"fg", bufIo.read())
         self.assert_(None is bufIo.read())
-        self.assertEquals(io.EOF, bufIo.read())
+        self.assertEquals(b"", bufIo.read())
 
     def testReadToEof(self):
         rawIo = MockIO((b"abc", b"d", b"efg"))
@@ -270,8 +313,9 @@
 
         bufIo.write(b"asdfasdfasdf")
 
-        # XXX I don't like this test. It relies too heavily on how the algorithm
-        # actually works, which we might change. Refactor later.
+        # XXX I don't like this test. It relies too heavily on how the
+        # algorithm actually works, which we might change. Refactor
+        # later.
 
     def testFileno(self):
         rawIo = MockIO((b"abc", b"d", b"efg"))
@@ -299,7 +343,7 @@
         # XXX need implementation
 
 
-class BufferedRandom(unittest.TestCase):
+class BufferedRandomTest(unittest.TestCase):
 
     def testReadAndWrite(self):
         raw = MockIO((b"asdf", b"ghjk"))
@@ -331,12 +375,56 @@
         self.assertEquals(7, rw.tell())
         self.assertEquals(b"fl", rw.read(11))
 
+
+class TextIOWrapperTest(unittest.TestCase):
+    def testNewlines(self):
+        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
+
+        tests = [
+            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
+            [ '\n', input_lines ],
+            [ '\r\n', input_lines ],
+        ]
+
+        encodings = ('utf-8', 'bz2')
+
+        # Try a range of pad sizes to test the case where \r is the last
+        # character in TextIOWrapper._pending_line.
+        for encoding in encodings:
+            for do_reads in (False, True):
+                for padlen in chain(range(10), range(50, 60)):
+                    pad = '.' * padlen
+                    data_lines = [ pad + line for line in input_lines ]
+                    # XXX: str.encode() should return bytes
+                    data = bytes(''.join(data_lines).encode(encoding))
+
+                    for newline, exp_line_ends in tests:
+                        exp_lines = [ pad + line for line in exp_line_ends ]
+                        bufIo = io.BufferedReader(io.BytesIO(data))
+                        textIo = io.TextIOWrapper(bufIo, newline=newline,
+                                                  encoding=encoding)
+                        if do_reads:
+                            got_lines = []
+                            while True:
+                                c2 = textIo.read(2)
+                                if c2 == '':
+                                    break
+                                self.assertEquals(len(c2), 2)
+                                got_lines.append(c2 + textIo.readline())
+                        else:
+                            got_lines = list(textIo)
+
+                        for got_line, exp_line in zip(got_lines, exp_lines):
+                            self.assertEquals(got_line, exp_line)
+                        self.assertEquals(len(got_lines), len(exp_lines))
+
 # XXX Tests for open()
 
 def test_main():
-    test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
+    test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
+                              BufferedReaderTest,
                               BufferedWriterTest, BufferedRWPairTest,
-                              BufferedRandom)
+                              BufferedRandomTest, TextIOWrapperTest)
 
 if __name__ == "__main__":
     test_main()


More information about the Python-3000-checkins mailing list