[Python-3000-checkins] r53995 - in python/branches/p3yk/Lib: io.py test/test_io.py

guido.van.rossum python-3000-checkins at python.org
Tue Feb 27 18:19:33 CET 2007


Author: guido.van.rossum
Date: Tue Feb 27 18:19:33 2007
New Revision: 53995

Modified:
   python/branches/p3yk/Lib/io.py
   python/branches/p3yk/Lib/test/test_io.py
Log:
Mike Verdone's checkpoint, cleaned up.
Also implemented Neal's suggestion (add fileno() to SocketIO)
and some unrelated changes, e.g. remove Google copyright
and make BytesIO a subclass of BufferedIOBase.


Modified: python/branches/p3yk/Lib/io.py
==============================================================================
--- python/branches/p3yk/Lib/io.py	(original)
+++ python/branches/p3yk/Lib/io.py	Tue Feb 27 18:19:33 2007
@@ -1,6 +1,3 @@
-# Copyright 2006 Google, Inc. All Rights Reserved.
-# Licensed to PSF under a Contributor Agreement.
-
 """New I/O library.
 
 This is an early prototype; eventually some of this will be
@@ -9,12 +6,17 @@
 See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m
 """
 
-__author__ = "Guido van Rossum <guido at python.org>"
+__author__ = ("Guido van Rossum <guido at python.org>, "
+              "Mike Verdone <mike.verdone at gmail.com>")
 
-__all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO"]
+__all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO",
+           "BufferedReader", "BufferedWriter", "BufferedRWPair", "EOF"]
 
 import os
 
+DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
+EOF = b""
+
 def open(filename, mode="r", buffering=None, *, encoding=None):
     """Replacement for the built-in open function.
 
@@ -71,8 +73,8 @@
                  (appending and "a" or "") +
                  (updating and "+" or ""))
     if buffering is None:
-        buffering = 8*1024  # International standard buffer size
-        # Should default to line buffering if os.isatty(raw.fileno())
+        buffering = DEFAULT_BUFFER_SIZE
+        # XXX Should default to line buffering if os.isatty(raw.fileno())
         try:
             bs = os.fstat(raw.fileno()).st_blksize
         except (os.error, AttributeError):
@@ -219,7 +221,7 @@
     def fileno(self):
         return self._fd
 
-    
+
 class SocketIO(RawIOBase):
 
     """Raw I/O implementation for stream sockets."""
@@ -249,12 +251,18 @@
     def writable(self):
         return "w" in self._mode
 
-    # XXX(nnorwitz)???  def fileno(self): return self._sock.fileno()
+    def fileno(self):
+        return self._sock.fileno()
+
+
+class BufferedIOBase(RawIOBase):
 
+    """XXX Docstring."""
 
-class BytesIO(RawIOBase):
 
-    """Raw I/O implementation for bytes, like StringIO."""
+class BytesIO(BufferedIOBase):
+
+    """Buffered I/O implementation using a bytes buffer, like StringIO."""
 
     # XXX More docs
 
@@ -267,7 +275,9 @@
     def getvalue(self):
         return self._buffer
 
-    def read(self, n):
+    def read(self, n=None):
+        if n is None:
+            n = len(self._buffer)
         assert n >= 0
         newpos = min(len(self._buffer), self._pos + n)
         b = self._buffer[self._pos : newpos]
@@ -312,3 +322,113 @@
 
     def seekable(self):
         return True
+
+
+class BufferedReader(BufferedIOBase):
+
+    """Buffered reader.
+
+    Buffer for a readable sequential RawIO object. Does not allow
+    random access (seek, tell).
+    """
+
+    def __init__(self, raw):
+        """
+        Create a new buffered reader using the given readable raw IO object.
+        """
+        assert raw.readable()
+        self.raw = raw
+        self._read_buf = b''
+        if hasattr(raw, 'fileno'):
+            self.fileno = raw.fileno
+
+    def read(self, n=None):
+        """
+        Read n bytes. Returns exactly n bytes of data unless the underlying
+        raw IO stream reaches EOF of if the call would block in non-blocking
+        mode. If n is None, read until EOF or until read() would block.
+        """
+        nodata_val = EOF
+        while (len(self._read_buf) < n) if (n is not None) else True:
+            current = self.raw.read(n)
+            if current in (EOF, None):
+                nodata_val = current
+                break
+            self._read_buf += current # XXX using += is bad
+        read = self._read_buf[:n]
+        if (not self._read_buf):
+            return nodata_val
+        self._read_buf = self._read_buf[n if n else 0:]
+        return read
+
+    def write(self, b):
+        raise IOError(".write() unsupported")
+
+    def readable(self):
+        return True
+
+    def flush(self):
+        # Flush is a no-op
+        pass
+
+
+class BufferedWriter(BufferedIOBase):
+
+    """Buffered writer.
+
+    XXX More docs.
+    """
+
+    def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
+        assert raw.writeable()
+        self.raw = raw
+        self.buffer_size = buffer_size
+        self._write_buf_stack = []
+        self._write_buf_size = 0
+        if hasattr(raw, 'fileno'):
+            self.fileno = raw.fileno
+
+    def read(self, n=None):
+        raise IOError(".read() not supported")
+
+    def write(self, b):
+        assert issubclass(type(b), bytes)
+        self._write_buf_stack.append(b)
+        self._write_buf_size += len(b)
+        if (self._write_buf_size > self.buffer_size):
+            self.flush()
+
+    def writeable(self):
+        return True
+
+    def flush(self):
+        buf = b''.join(self._write_buf_stack)
+        while len(buf):
+            buf = buf[self.raw.write(buf):]
+        self._write_buf_stack = []
+        self._write_buf_size = 0
+
+    # XXX support flushing buffer on close, del
+
+
+class BufferedRWPair(BufferedReader, BufferedWriter):
+
+    """Buffered Read/Write Pair.
+
+    A buffered reader object and buffered writer object put together to
+    form a sequential IO object that can read and write.
+    """
+
+    def __init__(self, bufferedReader, bufferedWriter):
+        assert bufferedReader.readable()
+        assert bufferedWriter.writeable()
+        self.bufferedReader = bufferedReader
+        self.bufferedWriter = bufferedWriter
+        self.read = bufferedReader.read
+        self.write = bufferedWriter.write
+        self.flush = bufferedWriter.flush
+        self.readable = bufferedReader.readable
+        self.writeable = bufferedWriter.writeable
+
+    def seekable(self):
+        return False

Modified: python/branches/p3yk/Lib/test/test_io.py
==============================================================================
--- python/branches/p3yk/Lib/test/test_io.py	(original)
+++ python/branches/p3yk/Lib/test/test_io.py	Tue Feb 27 18:19:33 2007
@@ -1,8 +1,43 @@
+"""Unit tests for io.py."""
+
 import unittest
 from test import test_support
 
 import io
 
+
+class MockReadIO(io.RawIOBase):
+    def __init__(self, readStack):
+        self._readStack = list(readStack)
+
+    def read(self, n=None):
+        try:
+            return self._readStack.pop(0)
+        except:
+            return io.EOF
+
+    def fileno(self):
+        return 42
+
+    def readable(self):
+        return True
+
+
+class MockWriteIO(io.RawIOBase):
+    def __init__(self):
+        self._writeStack = []
+
+    def write(self, b):
+        self._writeStack.append(b)
+        return len(b)
+
+    def writeable(self):
+        return True
+
+    def fileno(self):
+        return 42
+
+
 class IOTest(unittest.TestCase):
 
     def write_ops(self, f):
@@ -55,8 +90,117 @@
         f = io.BytesIO(data)
         self.read_ops(f)
 
+
+class BytesIOTest(unittest.TestCase):
+
+    def testInit(self):
+        buf = b"1234567890"
+        bytesIo = io.BytesIO(buf)
+
+    def testRead(self):
+        buf = b"1234567890"
+        bytesIo = io.BytesIO(buf)
+
+        self.assertEquals(buf[:1], bytesIo.read(1))
+        self.assertEquals(buf[1:5], bytesIo.read(4))
+        self.assertEquals(buf[5:], bytesIo.read(900))
+        self.assertEquals(io.EOF, bytesIo.read())
+
+    def testReadNoArgs(self):
+        buf = b"1234567890"
+        bytesIo = io.BytesIO(buf)
+
+        self.assertEquals(buf, bytesIo.read())
+        self.assertEquals(io.EOF, bytesIo.read())
+
+    def testSeek(self):
+        buf = b"1234567890"
+        bytesIo = io.BytesIO(buf)
+
+        bytesIo.read(5)
+        bytesIo.seek(0)
+        self.assertEquals(buf, bytesIo.read())
+
+        bytesIo.seek(3)
+        self.assertEquals(buf[3:], bytesIo.read())
+
+    def testTell(self):
+        buf = b"1234567890"
+        bytesIo = io.BytesIO(buf)
+
+        self.assertEquals(0, bytesIo.tell())
+        bytesIo.seek(5)
+        self.assertEquals(5, bytesIo.tell())
+        bytesIo.seek(10000)
+        self.assertEquals(10000, bytesIo.tell())
+
+
+class BufferedReaderTest(unittest.TestCase):
+
+    def testRead(self):
+        rawIo = MockReadIO((b"abc", b"d", b"efg"))
+        bufIo = io.BufferedReader(rawIo)
+
+        self.assertEquals(b"abcdef", bufIo.read(6))
+
+    def testReadToEof(self):
+        rawIo = MockReadIO((b"abc", b"d", b"efg"))
+        bufIo = io.BufferedReader(rawIo)
+
+        self.assertEquals(b"abcdefg", bufIo.read(9000))
+
+    def testReadNoArgs(self):
+        rawIo = MockReadIO((b"abc", b"d", b"efg"))
+        bufIo = io.BufferedReader(rawIo)
+
+        self.assertEquals(b"abcdefg", bufIo.read())
+
+    def testFileno(self):
+        rawIo = MockReadIO((b"abc", b"d", b"efg"))
+        bufIo = io.BufferedReader(rawIo)
+
+        self.assertEquals(42, bufIo.fileno())
+
+    def testFilenoNoFileno(self):
+        # TODO will we always have fileno() function? If so, kill
+        # this test. Else, write it.
+        pass
+
+
+class BufferedWriterTest(unittest.TestCase):
+
+    def testWrite(self):
+        # Write to the buffered IO but don't overflow the buffer.
+        writer = MockWriteIO()
+        bufIo = io.BufferedWriter(writer, 8)
+
+        bufIo.write(b"abc")
+
+        self.assertFalse(writer._writeStack)
+
+    def testWriteOverflow(self):
+        writer = MockWriteIO()
+        bufIo = io.BufferedWriter(writer, 8)
+
+        bufIo.write(b"abc")
+        bufIo.write(b"defghijkl")
+
+        self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
+
+    def testFlush(self):
+        writer = MockWriteIO()
+        bufIo = io.BufferedWriter(writer, 8)
+
+        bufIo.write(b"abc")
+        bufIo.flush()
+
+        self.assertEquals(b"abc", writer._writeStack[0])
+
+# TODO. Tests for open()
+
 def test_main():
-    test_support.run_unittest(IOTest)
+    test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
+                              BufferedWriterTest)
 
 if __name__ == "__main__":
     test_main()


More information about the Python-3000-checkins mailing list