[Python-3000-checkins] r53984 - in python/branches/p3yk/Lib: io.py test/test_io.py

guido.van.rossum python-3000-checkins at python.org
Tue Feb 27 06:47:47 CET 2007


Author: guido.van.rossum
Date: Tue Feb 27 06:47:44 2007
New Revision: 53984

Added:
   python/branches/p3yk/Lib/io.py   (contents, props changed)
   python/branches/p3yk/Lib/test/test_io.py   (contents, props changed)
Log:
Checkpoint for new I/O library.


Added: python/branches/p3yk/Lib/io.py
==============================================================================
--- (empty file)
+++ python/branches/p3yk/Lib/io.py	Tue Feb 27 06:47:44 2007
@@ -0,0 +1,264 @@
+# Copyright 2006 Google, Inc. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
+
+"""New I/O library.
+
+See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m
+"""
+
+__author__ = "Guido van Rossum <guido at python.org>"
+
+__all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO"]
+
+import os
+
+def open(filename, mode="r", buffering=None, *, encoding=None):
+    """Replacement for the built-in open function, with encoding parameter."""
+    assert isinstance(filename, str)
+    assert isinstance(mode, str)
+    assert buffering is None or isinstance(buffering, int)
+    assert encoding is None or isinstance(encoding, str)
+    modes = set(mode)
+    if modes - set("arwb+t") or len(mode) > len(modes):
+        raise ValueError("invalid mode: %r" % mode)
+    reading = "r" in modes
+    writing = "w" in modes or "a" in modes
+    binary = "b" in modes
+    appending = "a" in modes
+    updating = "+" in modes
+    text = "t" in modes or not binary
+    if text and binary:
+        raise ValueError("can't have text and binary mode at once")
+    if reading + writing + appending > 1:
+        raise ValueError("can't have read/write/append mode at once")
+    if not (reading or writing or appending):
+        raise ValueError("must have exactly one of read/write/append mode")
+    if binary and encoding is not None:
+        raise ValueError("binary mode doesn't take an encoding")
+    raw = FileIO(filename,
+                 (reading and "r" or "") +
+                 (writing and "w" or "") +
+                 (appending and "a" or "") +
+                 (updating and "+" or ""))
+    if buffering is None:
+        buffering = 8*1024  # International standard buffer size
+    if buffering < 0:
+        raise ValueError("invalid buffering size")
+    if buffering == 0:
+        if binary:
+            return raw
+        raise ValueError("can't have unbuffered text I/O")
+    if updating:
+        buffer = BufferedRandom(raw, buffering)
+    elif writing:
+        buffer = BufferedWriter(raw, buffering)
+    else:
+        assert reading
+        buffer = BufferedReader(raw, buffering)
+    if binary:
+        return buffer
+    assert text
+    textio = TextIOWrapper(buffer)  # Universal newlines default to on
+    return textio
+
+
+class RawIOBase:
+
+    """Base class for raw binary I/O."""
+
+    def read(self, n):
+        b = bytes(n.__index__())
+        self.readinto(b)
+        return b
+
+    def readinto(self, b):
+        raise IOError(".readinto() not supported")
+
+    def write(self, b):
+        raise IOError(".write() not supported")
+
+    def seek(self, pos, whence=0):
+        raise IOError(".seek() not supported")
+
+    def tell(self):
+        raise IOError(".tell() not supported")
+
+    def truncate(self, pos=None):
+        raise IOError(".truncate() not supported")
+
+    def close(self):
+        pass
+
+    def seekable(self):
+        return False
+
+    def readable(self):
+        return False
+
+    def writable(self):
+        return False
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def fileno(self):
+        raise IOError(".fileno() not supported")
+
+
+class FileIO(RawIOBase):
+
+    """Raw I/O implementation for OS files."""
+
+    def __init__(self, filename, mode):
+        self._seekable = None
+        self._mode = mode
+        if mode == "r":
+            flags = os.O_RDONLY
+        elif mode == "w":
+            flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
+            self._writable = True
+        elif mode == "r+":
+            flags = os.O_RDWR
+        else:
+            assert 0, "unsupported mode %r (for now)" % mode
+        if hasattr(os, "O_BINARY"):
+            flags |= os.O_BINARY
+        self._fd = os.open(filename, flags)
+
+    def readinto(self, b):
+        # XXX We really should have os.readinto()
+        b[:] = os.read(self._fd, len(b))
+        return len(b)
+
+    def write(self, b):
+        return os.write(self._fd, b)
+
+    def seek(self, pos, whence=0):
+        os.lseek(self._fd, pos, whence)
+
+    def tell(self):
+        return os.lseek(self._fd, 0, 1)
+
+    def truncate(self, pos=None):
+        if pos is None:
+            pos = self.tell()
+        os.ftruncate(self._fd, pos)
+
+    def close(self):
+        os.close(self._fd)
+
+    def readable(self):
+        return "r" in self._mode or "+" in self._mode
+
+    def writable(self):
+        return "w" in self._mode or "+" in self._mode or "a" in self._mode
+
+    def seekable(self):
+        if self._seekable is None:
+            try:
+                os.lseek(self._fd, 0, 1)
+            except os.error:
+                self._seekable = False
+            else:
+                self._seekable = True
+        return self._seekable
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def fileno(self):
+        return self._fd
+
+    
+class SocketIO(RawIOBase):
+
+    """Raw I/O implementation for stream sockets."""
+
+    def __init__(self, sock, mode):
+        assert mode in ("r", "w", "rw")
+        self._sock = sock
+        self._mode = mode
+        self._readable = "r" in mode
+        self._writable = "w" in mode
+        self._seekable = False
+
+    def readinto(self, b):
+        return self._sock.recv_into(b)
+
+    def write(self, b):
+        return self._sock.send(b)
+
+    def close(self):
+        self._sock.close()
+
+    def readable(self):
+        return "r" in self._mode
+
+    def writable(self):
+        return "w" in self._mode
+
+
+class BytesIO(RawIOBase):
+
+    """Raw I/O implementation for bytes, like StringIO."""
+
+    def __init__(self, inital_bytes=None):
+        self._buffer = b""
+        self._pos = 0
+        if inital_bytes is not None:
+            self._buffer += inital_bytes
+
+    def getvalue(self):
+        return self._buffer
+
+    def read(self, n):
+        assert n >= 0
+        newpos = min(len(self._buffer), self._pos + n)
+        b = self._buffer[self._pos : newpos]
+        self._pos = newpos
+        return b
+
+    def readinto(self, b):
+        b[:] = self.read(len(b))
+
+    def write(self, b):
+        n = len(b)
+        newpos = self._pos + n
+        self._buffer[self._pos:newpos] = b
+        self._pos = newpos
+        return n
+
+    def seek(self, pos, whence=0):
+        if whence == 0:
+            self._pos = max(0, pos)
+        elif whence == 1:
+            self._pos = max(0, self._pos + pos)
+        elif whence == 2:
+            self._pos = max(0, len(self._buffer) + pos)
+        else:
+            raise IOError("invalid whence value")
+
+    def tell(self):
+        return self._pos
+
+    def truncate(self, pos=None):
+        if pos is None:
+            pos = self._pos
+        else:
+            self._pos = max(0, pos)
+        del self._buffer[pos:]
+
+    def readable(self):
+        return True
+
+    def writable(self):
+        return True
+
+    def seekable(self):
+        return True

Added: python/branches/p3yk/Lib/test/test_io.py
==============================================================================
--- (empty file)
+++ python/branches/p3yk/Lib/test/test_io.py	Tue Feb 27 06:47:44 2007
@@ -0,0 +1,62 @@
+import unittest
+from test import test_support
+
+import io
+
+class IOTest(unittest.TestCase):
+
+    def write_ops(self, f):
+        f.write(b"blah.")
+        f.seek(0)
+        f.write(b"Hello.")
+        self.assertEqual(f.tell(), 6)
+        f.seek(-1, 1)
+        self.assertEqual(f.tell(), 5)
+        f.write(" world\n\n\n")
+        f.seek(0)
+        f.write("h")
+        f.seek(-2, 2)
+        f.truncate()
+
+    def read_ops(self, f):
+        data = f.read(5)
+        self.assertEqual(data, b"hello")
+        f.readinto(data)
+        self.assertEqual(data, b" worl")
+        f.readinto(data)
+        self.assertEqual(data, b"d\n")
+        f.seek(0)
+        self.assertEqual(f.read(20), b"hello world\n")
+        f.seek(-6, 2)
+        self.assertEqual(f.read(5), b"world")
+        f.seek(-6, 1)
+        self.assertEqual(f.read(5), b" worl")
+        self.assertEqual(f.tell(), 10)
+
+    def test_raw_file_io(self):
+        f = io.open(test_support.TESTFN, "wb", buffering=0)
+        self.assertEqual(f.readable(), False)
+        self.assertEqual(f.writable(), True)
+        self.assertEqual(f.seekable(), True)
+        self.write_ops(f)
+        f.close()
+        f = io.open(test_support.TESTFN, "rb", buffering=0)
+        self.assertEqual(f.readable(), True)
+        self.assertEqual(f.writable(), False)
+        self.assertEqual(f.seekable(), True)
+        self.read_ops(f)
+        f.close()
+
+    def test_raw_bytes_io(self):
+        f = io.BytesIO()
+        self.write_ops(f)
+        data = f.getvalue()
+        self.assertEqual(data, b"hello world\n")
+        f = io.BytesIO(data)
+        self.read_ops(f)
+
+def test_main():
+    test_support.run_unittest(IOTest)
+
+if __name__ == "__main__":
+    test_main()


More information about the Python-3000-checkins mailing list