[Python-checkins] r45882 - sandbox/trunk/sio/io.py sandbox/trunk/sio/test_io.py

guido.van.rossum python-checkins at python.org
Wed May 3 07:17:37 CEST 2006


Author: guido.van.rossum
Date: Wed May  3 07:17:37 2006
New Revision: 45882

Added:
   sandbox/trunk/sio/test_io.py   (contents, props changed)
Modified:
   sandbox/trunk/sio/io.py
Log:
Some progress -- now using TDD.


Modified: sandbox/trunk/sio/io.py
==============================================================================
--- sandbox/trunk/sio/io.py	(original)
+++ sandbox/trunk/sio/io.py	Wed May  3 07:17:37 2006
@@ -93,20 +93,34 @@
 
     """A binary file using I/O on Unix file descriptors."""
 
-    def __init__(self, fd):
+    def __init__(self, fd, __more=None,
+                 readable=True, writable=False,
+                 seekable=False, truncatable=False):
+        if __more is not None:
+            raise TypeError("only one positional argument is allowed")
         self._fd = fd
+        self._readable = readable
+        self._writable = writable
+        self._seekable = seekable
+        self._truncatable = truncatable
+
+    def close(self):
+        fd = self._fd
+        self._fd = None
+        if fd is not None:
+            os.close(fd)
 
     def readable(self):
-        return True # May be a lie -- but how to tell the mode?
+        return self._readable
 
     def writable(self):
-        return True # May be a lie -- but how to tell the mode?
+        return self._writable
 
     def seekable(self):
-        return True # May be a lie -- but how to tell the mode?
+        return self._seekable
 
     def truncatable(self):
-        return True # May be a lie -- but how to tell the mode?
+        return self._truncatable
 
     def read(self, n):
         b = os.read(self._fd, n)
@@ -128,7 +142,7 @@
         os.ftruncate(self._fd, offset)
         return os.lseek(self._fd, 0, 2)
 
-    # XXX ioctl, fcntl?  What else?
+    # XXX ioctl? fcntl? isatty? fileno?  What else?
 
 
 class BufferingBinaryFileWrapper:
@@ -164,13 +178,32 @@
         #     else:
         #         self._buffer[self._bufptr : ] = data to be read
 
+    def close(self):
+        file = self._file
+        if file is not None:
+            self.flush()
+            self._file = None
+            file.close()
+
+    def readable(self):
+        return self._file.readable()
+
+    def writable(self):
+        return self._file.writable()
+
+    def seekable(self):
+        return self._file.seekable()
+
+    def truncatable(self):
+        return self._file.truncatable()
+
     def flush(self):
         if self._writing:
             start = 0
             while start < self._bufptr:
                 start += self._file.write(self._buffer[start : self._bufptr])
             self._bufptr = 0
-        elif self._bufptr < len(self._buffer):
+        elif self._bufptr < len(self._buffer) and self._file.seekable():
             self._file.seek(self._bufptr - len(self._buffer), 1)
             self._buffer = bytes()
             self._bufptr = 0
@@ -224,22 +257,66 @@
             self._buffer = bytes(self._bufsize)
         if self._bufptr + len(b) < len(self._buffer):
             self._buffer[self._bufptr : self._bufptr + len(b)] = b
+            self._bufptr += len(b)
         elif self._bufptr + len(b) == len(self._buffer):
             self._buffer[self._bufptr : ] = b
+            self._bufptr = len(self._buffer)
             self.flush()
         else:
             n = len(self._buffer) - self._bufptr
             self._buffer[self._bufptr : ] = b[ : n]
+            self._bufptr = len(self._buffer)
             self.flush()
-            self._bufptr[ : len(b) - n] = b[n : ]
+            self._buffer[ : len(b) - n] = b[n : ]
             self._bufptr = len(b) - n
 
+    def seek(self, pos, whence=0):
+        if not self._file.seekable():
+            raise IOError("file is not seekable")
+        # XXX Optimize this for seeks while reading within the buffer?
+        self.flush()
+        self._file.seek(pos, whence)
+
+    def tell(self):
+        if not self._file.seekable():
+            raise IOError("file is not seekable")
+        pos = self._file.tell()
+        if self._writing:
+            pos += self._bufptr
+        else:
+            pos +=  self._bufptr - len(self._buffer)
+        return pos
 
-def main():
-    import sys
-    fd = sys.stdout.fileno()
-    f = OSBinaryFile(fd)
-    f.write(bytes(u"hello world\n", "ascii"))
+    def truncate(self, pos):
+        if not self._file.truncatable():
+            raise IOError("file is not truncatable")
+        self.flush()
+        return self._file.truncate(pos)
 
-if __name__ == "__main__":
-    main()
+
+def open(filename, mode, bufsize=DEFAULT_BUFSIZE):
+    """Return an open file object.
+
+    This is a versatile factory function that can open binary and text
+    files for reading and writing.
+
+    """
+    if mode not in ("rb", "wb"):
+        raise ValueError("unrecognized mode: %r" % (mode,))
+    flags = getattr(os, "O_BINARY", 0)
+    readable = writable = False
+    if mode == "rb":
+        flags |= os.O_RDONLY
+        readable = True
+    else:
+        flags |= os.O_WRONLY | os.O_TRUNC | os.O_CREAT
+        writable = True
+    fd = os.open(filename, flags, 0666)
+    file = OSBinaryFile(fd,
+                        readable=readable,
+                        writable=writable,
+                        seekable=True,
+                        truncatable=hasattr(os, "ftruncate"))
+    if bufsize != 0:
+        file = BufferingBinaryFileWrapper(file, bufsize)
+    return file

Added: sandbox/trunk/sio/test_io.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/sio/test_io.py	Wed May  3 07:17:37 2006
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3.0
+
+"""Unit tests for io.py."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import io # Local
+
+
+class IOTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.tfn = tempfile.mktemp()
+
+    def tearDown(self):
+        if os.path.exists(self.tfn):
+            os.remove(self.tfn)
+
+    def test_unbuffered(self):
+        sample1 = bytes("hello ")
+        sample2 = bytes("world\n")
+        f = io.open(self.tfn, "wb", 0)
+        try:
+            f.write(sample1)
+            f.write(sample2)
+        finally:
+            f.close()
+
+        f = open(self.tfn) # Classic open!
+        try:
+            data = f.read()
+        finally:
+            f.close()
+        self.assertEquals(bytes(data), sample1+sample2)
+
+        f = io.open(self.tfn, "rb", 0)
+        try:
+            data = f.read(1)
+            self.assertEquals(data, sample1[:1])
+            self.assertEquals(f.tell(), 1)
+            f.seek(0)
+            self.assertEquals(f.tell(), 0)
+            data = f.read(len(sample1))
+            self.assertEquals(data, sample1)
+            data += f.read(100)
+            self.assertEquals(data, sample1+sample2)
+        finally:
+            f.close()
+
+    def test_buffered_read(self):
+        sample1 = bytes("hello ")
+        sample2 = bytes("world\n")
+        for bufsize in 1, 2, 3, 4, 5, 6, 7, 8, 16, 8*1024:
+            f = io.open(self.tfn, "wb", bufsize)
+            try:
+                f.write(sample1)
+                f.write(sample2)
+            finally:
+                f.close()
+
+            f = open(self.tfn) # Classic open!
+            try:
+                data = f.read()
+            finally:
+                f.close()
+            self.assertEquals(bytes(data), sample1+sample2,
+                              "%r != %r, bufsize=%s" % (bytes(data),
+                                                        sample1+sample2,
+                                                        bufsize))
+
+            f = io.open(self.tfn, "rb", bufsize)
+            try:
+                data = f.read(1)
+                self.assertEquals(data, sample1[:1])
+                self.assertEquals(f.tell(), 1)
+                f.seek(0)
+                self.assertEquals(f.tell(), 0)
+                data = f.read(len(sample1))
+                self.assertEquals(data, sample1)
+                data += f.read(100)
+                self.assertEquals(data, sample1+sample2)
+            finally:
+                f.close()
+
+            os.remove(self.tfn)
+        
+
+if __name__ == "__main__":
+    unittest.main()


More information about the Python-checkins mailing list