[Python-checkins] r45882 - sandbox/trunk/sio/io.py sandbox/trunk/sio/test_io.py
guido.van.rossum
python-checkins at python.org
Wed May 3 07:17:37 CEST 2006
Author: guido.van.rossum
Date: Wed May 3 07:17:37 2006
New Revision: 45882
Added:
sandbox/trunk/sio/test_io.py (contents, props changed)
Modified:
sandbox/trunk/sio/io.py
Log:
Some progress -- now using TDD.
Modified: sandbox/trunk/sio/io.py
==============================================================================
--- sandbox/trunk/sio/io.py (original)
+++ sandbox/trunk/sio/io.py Wed May 3 07:17:37 2006
@@ -93,20 +93,34 @@
"""A binary file using I/O on Unix file descriptors."""
- def __init__(self, fd):
+ def __init__(self, fd, __more=None,
+ readable=True, writable=False,
+ seekable=False, truncatable=False):
+ if __more is not None:
+ raise TypeError("only one positional argument is allowed")
self._fd = fd
+ self._readable = readable
+ self._writable = writable
+ self._seekable = seekable
+ self._truncatable = truncatable
+
+ def close(self):
+ fd = self._fd
+ self._fd = None
+ if fd is not None:
+ os.close(fd)
def readable(self):
- return True # May be a lie -- but how to tell the mode?
+ return self._readable
def writable(self):
- return True # May be a lie -- but how to tell the mode?
+ return self._writable
def seekable(self):
- return True # May be a lie -- but how to tell the mode?
+ return self._seekable
def truncatable(self):
- return True # May be a lie -- but how to tell the mode?
+ return self._truncatable
def read(self, n):
b = os.read(self._fd, n)
@@ -128,7 +142,7 @@
os.ftruncate(self._fd, offset)
return os.lseek(self._fd, 0, 2)
- # XXX ioctl, fcntl? What else?
+ # XXX ioctl? fcntl? isatty? fileno? What else?
class BufferingBinaryFileWrapper:
@@ -164,13 +178,32 @@
# else:
# self._buffer[self._bufptr : ] = data to be read
+ def close(self):
+ file = self._file
+ if file is not None:
+ self.flush()
+ self._file = None
+ file.close()
+
+ def readable(self):
+ return self._file.readable()
+
+ def writable(self):
+ return self._file.writable()
+
+ def seekable(self):
+ return self._file.seekable()
+
+ def truncatable(self):
+ return self._file.truncatable()
+
def flush(self):
if self._writing:
start = 0
while start < self._bufptr:
start += self._file.write(self._buffer[start : self._bufptr])
self._bufptr = 0
- elif self._bufptr < len(self._buffer):
+ elif self._bufptr < len(self._buffer) and self._file.seekable():
self._file.seek(self._bufptr - len(self._buffer), 1)
self._buffer = bytes()
self._bufptr = 0
@@ -224,22 +257,66 @@
self._buffer = bytes(self._bufsize)
if self._bufptr + len(b) < len(self._buffer):
self._buffer[self._bufptr : self._bufptr + len(b)] = b
+ self._bufptr += len(b)
elif self._bufptr + len(b) == len(self._buffer):
self._buffer[self._bufptr : ] = b
+ self._bufptr = len(self._buffer)
self.flush()
else:
n = len(self._buffer) - self._bufptr
self._buffer[self._bufptr : ] = b[ : n]
+ self._bufptr = len(self._buffer)
self.flush()
- self._bufptr[ : len(b) - n] = b[n : ]
+ self._buffer[ : len(b) - n] = b[n : ]
self._bufptr = len(b) - n
+ def seek(self, pos, whence=0):
+ if not self._file.seekable():
+ raise IOError("file is not seekable")
+ # XXX Optimize this for seeks while reading within the buffer?
+ self.flush()
+ self._file.seek(pos, whence)
+
+ def tell(self):
+ if not self._file.seekable():
+ raise IOError("file is not seekable")
+ pos = self._file.tell()
+ if self._writing:
+ pos += self._bufptr
+ else:
+ pos += self._bufptr - len(self._buffer)
+ return pos
-def main():
- import sys
- fd = sys.stdout.fileno()
- f = OSBinaryFile(fd)
- f.write(bytes(u"hello world\n", "ascii"))
+ def truncate(self, pos):
+ if not self._file.truncatable():
+ raise IOError("file is not truncatable")
+ self.flush()
+ return self._file.truncate(pos)
-if __name__ == "__main__":
- main()
+
+def open(filename, mode, bufsize=DEFAULT_BUFSIZE):
+ """Return an open file object.
+
+ This is a versatile factory function that can open binary and text
+ files for reading and writing.
+
+ """
+ if mode not in ("rb", "wb"):
+ raise ValueError("unrecognized mode: %r" % (mode,))
+ flags = getattr(os, "O_BINARY", 0)
+ readable = writable = False
+ if mode == "rb":
+ flags |= os.O_RDONLY
+ readable = True
+ else:
+ flags |= os.O_WRONLY | os.O_TRUNC | os.O_CREAT
+ writable = True
+ fd = os.open(filename, flags, 0666)
+ file = OSBinaryFile(fd,
+ readable=readable,
+ writable=writable,
+ seekable=True,
+ truncatable=hasattr(os, "ftruncate"))
+ if bufsize != 0:
+ file = BufferingBinaryFileWrapper(file, bufsize)
+ return file
Added: sandbox/trunk/sio/test_io.py
==============================================================================
--- (empty file)
+++ sandbox/trunk/sio/test_io.py Wed May 3 07:17:37 2006
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3.0
+
+"""Unit tests for io.py."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import io # Local
+
+
+class IOTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.tfn = tempfile.mktemp()
+
+ def tearDown(self):
+ if os.path.exists(self.tfn):
+ os.remove(self.tfn)
+
+ def test_unbuffered(self):
+ sample1 = bytes("hello ")
+ sample2 = bytes("world\n")
+ f = io.open(self.tfn, "wb", 0)
+ try:
+ f.write(sample1)
+ f.write(sample2)
+ finally:
+ f.close()
+
+ f = open(self.tfn) # Classic open!
+ try:
+ data = f.read()
+ finally:
+ f.close()
+ self.assertEquals(bytes(data), sample1+sample2)
+
+ f = io.open(self.tfn, "rb", 0)
+ try:
+ data = f.read(1)
+ self.assertEquals(data, sample1[:1])
+ self.assertEquals(f.tell(), 1)
+ f.seek(0)
+ self.assertEquals(f.tell(), 0)
+ data = f.read(len(sample1))
+ self.assertEquals(data, sample1)
+ data += f.read(100)
+ self.assertEquals(data, sample1+sample2)
+ finally:
+ f.close()
+
+ def test_buffered_read(self):
+ sample1 = bytes("hello ")
+ sample2 = bytes("world\n")
+ for bufsize in 1, 2, 3, 4, 5, 6, 7, 8, 16, 8*1024:
+ f = io.open(self.tfn, "wb", bufsize)
+ try:
+ f.write(sample1)
+ f.write(sample2)
+ finally:
+ f.close()
+
+ f = open(self.tfn) # Classic open!
+ try:
+ data = f.read()
+ finally:
+ f.close()
+ self.assertEquals(bytes(data), sample1+sample2,
+ "%r != %r, bufsize=%s" % (bytes(data),
+ sample1+sample2,
+ bufsize))
+
+ f = io.open(self.tfn, "rb", bufsize)
+ try:
+ data = f.read(1)
+ self.assertEquals(data, sample1[:1])
+ self.assertEquals(f.tell(), 1)
+ f.seek(0)
+ self.assertEquals(f.tell(), 0)
+ data = f.read(len(sample1))
+ self.assertEquals(data, sample1)
+ data += f.read(100)
+ self.assertEquals(data, sample1+sample2)
+ finally:
+ f.close()
+
+ os.remove(self.tfn)
+
+
+if __name__ == "__main__":
+ unittest.main()
More information about the Python-checkins
mailing list