[pypy-svn] r7530 - in pypy/trunk/src/pypy/appspace: . test
jacob at codespeak.net
jacob at codespeak.net
Sat Nov 20 19:52:22 CET 2004
Author: jacob
Date: Sat Nov 20 19:52:21 2004
New Revision: 7530
Added:
pypy/trunk/src/pypy/appspace/_file.py
pypy/trunk/src/pypy/appspace/sio.py
pypy/trunk/src/pypy/appspace/test/test_file.py
pypy/trunk/src/pypy/appspace/test/test_sio.py
Log:
Added builtin file with support. Reading and writing work both buffered and unbuffered. Writing works linebuffered as well. Universal newline support works. Buffered combined read/write does not work yet. Linebuffered read does not work yet.
Added: pypy/trunk/src/pypy/appspace/_file.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/_file.py Sat Nov 20 19:52:21 2004
@@ -0,0 +1,51 @@
+import sio
+
+class file_(object):
+ """An implementation of file objects in Python. it relies on Guido's
+ sio.py implementation.
+ """
+ def __init__(self, filename, mode='r', bufsize=None):
+ self.reading = False
+ self.writing = False
+
+ if not mode:
+ raise IOError('invalid mode : ')
+ if mode[0] not in ['r', 'w', 'a', 'U']:
+ raise IOError('invalid mode : %s' % mode)
+ else:
+ if mode[0] in ['r', 'U']:
+ self.reading = True
+ else:
+ self.writing = True
+ try:
+ if mode[1] == 'b':
+ plus = mode[2]
+ else:
+ plus = mode[1]
+ if plus == '+':
+ self.reading = self.writing = True
+ except IndexError:
+ pass
+
+ self.fd = sio.DiskFile(filename, mode)
+ if mode in ['U', 'rU']:
+ # Wants universal newlines
+ self.fd = sio.TextInputFilter(self.fd)
+ if bufsize < 0:
+ bufsize = None
+ if not self.writing and (bufsize is None or bufsize > 0):
+ self.fd = sio.BufferingInputStream(self.fd, bufsize)
+ if not self.reading:
+ if bufsize is None or bufsize > 1:
+ self.fd = sio.BufferingOutputStream(self.fd, bufsize)
+ elif bufsize == 1:
+ self.fd = sio.LineBufferingOutputStream(self.fd)
+ return self.fd
+
+ def __getattr__(self, name):
+ """
+ Delegate all other methods to the underlying file object.
+ """
+ return getattr(self.fd, name)
+
+
Added: pypy/trunk/src/pypy/appspace/sio.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/sio.py Sat Nov 20 19:52:21 2004
@@ -0,0 +1,790 @@
+"""New standard I/O library.
+
+This code is still very young and experimental!
+
+There are fairly complete unit tests in test_sio.py.
+
+The design is simple:
+
+- A raw stream supports read(n), write(s), seek(offset, whence=0) and
+ tell(). This is generally unbuffered. Raw streams may support
+ Unicode.
+
+- A basis stream provides the raw stream API and builds on a much more
+ low-level API, e.g. the os, mmap or socket modules.
+
+- A filtering stream is raw stream built on top of another raw stream.
+ There are filtering streams for universal newline translation and
+ for unicode translation.
+
+- A buffering stream supports the full classic Python I/O API:
+ read(n=-1), readline(), readlines(sizehint=0), tell(), seek(offset,
+ whence=0), write(s), writelines(lst), as well as __iter__() and
+ next(). (There's also readall() but that's a synonym for read()
+ without arguments.) This is a superset of the raw stream API. I
+ haven't thought about fileno() and isatty() yet, nor about
+ truncate() or the various attributes like name and mode. Also,
+ close() is not implemented right. We really need only one buffering
+ stream implementation, which is a filtering stream.
+
+You typically take a basis stream, place zero or more filtering
+streams on top of it, and then top it off with a buffering stream.
+
+"""
+
+import os
+import mmap
+
+class BufferingInputStream(object):
+
+ """Standard buffering input stream.
+
+ This is typically the top of the stack.
+ """
+
+ bigsize = 2**19 # Half a Meg
+ bufsize = 2**13 # 8 K
+
+ def __init__(self, base, bufsize=None):
+ self.do_read = getattr(base, "read", None)
+ # function to fill buffer some more
+ self.do_tell = getattr(base, "tell", None)
+ # None, or return a byte offset
+ self.do_seek = getattr(base, "seek", None)
+ # None, or seek to a byte offset
+ self.close = base.close
+
+ if bufsize is None: # Get default from the class
+ bufsize = self.bufsize
+ self.bufsize = bufsize # buffer size (hint only)
+ self.lines = [] # ready-made lines (sans "\n")
+ self.buf = "" # raw data (may contain "\n")
+ # Invariant: readahead == "\n".join(self.lines + [self.buf])
+ # self.lines contains no "\n"
+ # self.buf may contain "\n"
+
+ def tell(self):
+ bytes = self.do_tell() # This may fail
+ offset = len(self.buf)
+ for line in self.lines:
+ offset += len(line) + 1
+ assert bytes >= offset, (locals(), self.__dict__)
+ return bytes - offset
+
+ def seek(self, offset, whence=0):
+ # This may fail on the do_seek() or do_tell() call.
+ # But it won't call either on a relative forward seek.
+ # Nor on a seek to the very end.
+ if whence == 0 or (whence == 2 and self.do_seek is not None):
+ self.do_seek(offset, whence)
+ self.lines = []
+ self.buf = ""
+ return
+ if whence == 2:
+ # Skip relative to EOF by reading and saving only just as
+ # much as needed
+ assert self.do_seek is None
+ data = "\n".join(self.lines + [self.buf])
+ total = len(data)
+ buffers = [data]
+ self.lines = []
+ self.buf = ""
+ while 1:
+ data = self.do_read(self.bufsize)
+ if not data:
+ break
+ buffers.append(data)
+ total += len(data)
+ while buffers and total >= len(buffers[0]) - offset:
+ total -= len(buffers[0])
+ del buffers[0]
+ cutoff = total + offset
+ if cutoff < 0:
+ raise TypeError, "cannot seek back"
+ if buffers:
+ buffers[0] = buffers[0][cutoff:]
+ self.buf = "".join(buffers)
+ self.lines = []
+ return
+ if whence == 1:
+ if offset < 0:
+ self.do_seek(self.tell() + offset, 0)
+ self.lines = []
+ self.buf = ""
+ return
+ while self.lines:
+ line = self.lines[0]
+ if offset <= len(line):
+ self.lines[0] = line[offset:]
+ return
+ offset -= len(self.lines[0]) - 1
+ del self.lines[0]
+ assert not self.lines
+ if offset <= len(self.buf):
+ self.buf = self.buf[offset:]
+ return
+ offset -= len(self.buf)
+ self.buf = ""
+ if self.do_seek is None:
+ self.read(offset)
+ else:
+ self.do_seek(offset, 1)
+ return
+ raise ValueError, "whence should be 0, 1 or 2"
+
+ def readall(self):
+ self.lines.append(self.buf)
+ more = ["\n".join(self.lines)]
+ self.lines = []
+ self.buf = ""
+ bufsize = self.bufsize
+ while 1:
+ data = self.do_read(bufsize)
+ if not data:
+ break
+ more.append(data)
+ bufsize = max(bufsize*2, self.bigsize)
+ return "".join(more)
+
+ def read(self, n=-1):
+ if n < 0:
+ return self.readall()
+
+ if self.lines:
+ # See if this can be satisfied from self.lines[0]
+ line = self.lines[0]
+ if len(line) >= n:
+ self.lines[0] = line[n:]
+ return line[:n]
+
+ # See if this can be satisfied *without exhausting* self.lines
+ k = 0
+ i = 0
+ for line in self.lines:
+ k += len(line)
+ if k >= n:
+ lines = self.lines[:i]
+ data = self.lines[i]
+ cutoff = len(data) - (k-n)
+ lines.append(data[:cutoff])
+ self.lines[:i+1] = [data[cutoff:]]
+ return "\n".join(lines)
+ k += 1
+ i += 1
+
+ # See if this can be satisfied from self.lines plus self.buf
+ if k + len(self.buf) >= n:
+ lines = self.lines
+ self.lines = []
+ cutoff = n - k
+ lines.append(self.buf[:cutoff])
+ self.buf = self.buf[cutoff:]
+ return "\n".join(lines)
+
+ else:
+ # See if this can be satisfied from self.buf
+ data = self.buf
+ k = len(data)
+ if k >= n:
+ cutoff = len(data) - (k-n)
+ self.buf = data[cutoff:]
+ return data[:cutoff]
+
+ lines = self.lines
+ self.lines = []
+ lines.append(self.buf)
+ self.buf = ""
+ data = "\n".join(lines)
+ more = [data]
+ k = len(data)
+ while k < n:
+ data = self.do_read(max(self.bufsize, n-k))
+ k += len(data)
+ more.append(data)
+ if not data:
+ break
+ cutoff = len(data) - (k-n)
+ self.buf = data[cutoff:]
+ more[-1] = data[:cutoff]
+ return "".join(more)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.lines:
+ return self.lines.pop(0) + "\n"
+
+ # This block is needed because read() can leave self.buf
+ # containing newlines
+ self.lines = self.buf.split("\n")
+ self.buf = self.lines.pop()
+ if self.lines:
+ return self.lines.pop(0) + "\n"
+
+ buf = self.buf and [self.buf] or []
+ while 1:
+ self.buf = self.do_read(self.bufsize)
+ self.lines = self.buf.split("\n")
+ self.buf = self.lines.pop()
+ if self.lines:
+ buf.append(self.lines.pop(0))
+ buf.append("\n")
+ break
+ if not self.buf:
+ break
+ buf.append(self.buf)
+
+ line = "".join(buf)
+ if not line:
+ raise StopIteration
+ return line
+
+ def readline(self):
+ try:
+ return self.next()
+ except StopIteration:
+ return ""
+
+ def readlines(self, sizehint=0):
+ return list(self)
+
+class BufferingOutputStream(object):
+
+ """Standard buffering output stream.
+
+ This is typically the top of the stack.
+ """
+
+ bigsize = 2**19 # Half a Meg
+ bufsize = 2**13 # 8 K
+
+ def __init__(self, base, bufsize=None):
+ self.do_write = base.write # Flush buffer
+ self.do_tell = base.tell
+ # Return a byte offset; has to exist or this __init__() will fail
+ self.do_seek = getattr(base, "seek", None)
+ # None, or seek to a byte offset
+ self.do_close = base.close # Close file
+
+ if bufsize is None: # Get default from the class
+ bufsize = self.bufsize
+ self.bufsize = bufsize # buffer size (hint only)
+ self.buf = ""
+ self.tell()
+
+ def tell(self):
+ assert self.do_tell is not None
+ if not hasattr(self, 'pos'):
+ self.pos = self.do_tell()
+
+ return self.pos
+
+ def seek(self, offset, whence=0):
+ self.do_write(self.buf)
+ self.buf = ''
+ self.do_seek(offset, whence)
+ self.pos = self.do_tell()
+
+ def write(self, data):
+ buflen = len(self.buf)
+ datalen = len(data)
+ if datalen + buflen < self.bufsize:
+ self.buf += data
+ self.pos += datalen
+ else:
+ self.buf += data[:self.bufsize-buflen]
+ self.pos += self.bufsize-buflen
+ self.do_write(self.buf)
+ self.buf = ''
+ self.write(data[self.bufsize-buflen:])
+
+ def close(self):
+ self.do_write(self.buf)
+ self.buf = ''
+ if self.do_close():
+ self.do_close()
+
+class LineBufferingOutputStream(BufferingOutputStream):
+
+ """Line buffering output stream.
+
+ This is typically the top of the stack.
+ """
+
+ def __init__(self, base, bufsize=None):
+ self.do_write = base.write # Flush buffer
+ self.do_tell = base.tell
+ # Return a byte offset; has to exist or this __init__() will fail
+ self.do_seek = getattr(base, "seek", None)
+ # None, or seek to a byte offset
+ self.do_close = base.close # Close file
+
+ self.linesep = os.linesep
+ self.buf = "" # raw data (may contain "\n")
+ self.tell()
+
+ def tell(self):
+ assert self.do_tell is not None
+ if not hasattr(self, 'pos'):
+ self.pos = self.do_tell()
+
+ return self.pos
+
+ def seek(self, offset, whence=0):
+ self.do_write(self.buf)
+ self.buf = ''
+ self.do_seek(offset, whence)
+ self.pos = self.do_tell()
+
+ def write(self, data):
+ all_lines = data.split(self.linesep)
+ full_lines = all_lines[:-1]
+ for line in full_lines:
+ line += self.linesep
+ buflen = len(self.buf)
+ linelen = len(line)
+ if linelen + buflen < self.bufsize:
+ self.buf += line
+ self.pos += linelen
+ self.do_write(self.buf)
+ self.buf = ''
+ else:
+ self.buf += line[:self.bufsize-buflen]
+ self.pos += self.bufsize-buflen
+ self.do_write(self.buf)
+ self.buf = ''
+ self.write(line[self.bufsize-buflen:])
+
+ # The last part of the split data never has a terminating linesep.
+ # If the data has a terminating linesep, the last element is an
+ # empty string.
+
+ line = all_lines[-1]
+ buflen = len(self.buf)
+ linelen = len(line)
+ if linelen + buflen < self.bufsize:
+ self.buf += line
+ self.pos += linelen
+ else:
+ self.buf += line[:self.bufsize-buflen]
+ self.pos += self.bufsize-buflen
+ self.do_write(self.buf)
+ self.buf = ''
+ self.write(line[self.bufsize-buflen:])
+
+ def close(self):
+ self.do_write(self.buf)
+ self.buf = ''
+ if self.do_close():
+ self.do_close()
+
+class CRLFFilter(object):
+
+ """Filtering stream for universal newlines.
+
+ TextInputFilter is more general, but this is faster when you don't
+ need tell/seek.
+ """
+
+ def __init__(self, base):
+ self.do_read = base.read
+ self.atcr = False
+ self.close = base.close
+
+ def read(self, n):
+ data = self.do_read(n)
+ if self.atcr:
+ if data.startswith("\n"):
+ data = data[1:] # Very rare case: in the middle of "\r\n"
+ self.atcr = False
+ if "\r" in data:
+ self.atcr = data.endswith("\r") # Test this before removing \r
+ data = data.replace("\r\n", "\n") # Catch \r\n this first
+ data = data.replace("\r", "\n") # Remaining \r are standalone
+ return data
+
+class MMapFile(object):
+
+ """Standard I/O basis stream using mmap."""
+
+ def __init__(self, filename, mode="r"):
+ self.filename = filename
+ self.mode = mode
+ if mode == "r":
+ flag = os.O_RDONLY
+ self.access = mmap.ACCESS_READ
+ else:
+ if mode == "w":
+ flag = os.O_RDWR | os.O_CREAT
+ elif mode == "a":
+ flag = os.O_RDWR
+ else:
+ raise ValueError, "mode should be 'r', 'w' or 'a'"
+ self.access = mmap.ACCESS_WRITE
+ if hasattr(os, "O_BINARY"):
+ flag |= os.O_BINARY
+ self.fd = os.open(filename, flag)
+ size = os.fstat(self.fd).st_size
+ self.mm = mmap.mmap(self.fd, size, access=self.access)
+ self.pos = 0
+
+ def __del__(self):
+ self.close()
+
+ mm = fd = None
+
+ def close(self):
+ if self.mm is not None:
+ self.mm.close()
+ self.mm = None
+ if self.fd is not None:
+ os.close(self.fd)
+ self.fd = None
+
+ def tell(self):
+ return self.pos
+
+ def seek(self, offset, whence=0):
+ if whence == 0:
+ self.pos = max(0, offset)
+ elif whence == 1:
+ self.pos = max(0, self.pos + offset)
+ elif whence == 2:
+ self.pos = max(0, self.mm.size() + offset)
+ else:
+ raise ValueError, "seek(): whence must be 0, 1 or 2"
+
+ def readall(self):
+ return self.read()
+
+ def read(self, n=-1):
+ if n >= 0:
+ aim = self.pos + n
+ else:
+ aim = self.mm.size() # Actual file size, may be more than mapped
+ n = aim - self.pos
+ data = self.mm[self.pos:aim]
+ if len(data) < n:
+ del data
+ # File grew since opened; remap to get the new data
+ size = os.fstat(self.fd).st_size
+ self.mm = mmap.mmap(self.fd, size, access=self.access)
+ data = self.mm[self.pos:aim]
+ self.pos += len(data)
+ return data
+
+ def __iter__(self):
+ return self
+
+ def readline(self):
+ hit = self.mm.find("\n", self.pos) + 1
+ if hit:
+ data = self.mm[self.pos:hit]
+ self.pos = hit
+ return data
+ # Remap the file just in case
+ size = os.fstat(self.fd).st_size
+ self.mm = mmap.mmap(self.fd, size, access=self.access)
+ hit = self.mm.find("\n", self.pos) + 1
+ if hit:
+ # Got a whole line after remapping
+ data = self.mm[self.pos:hit]
+ self.pos = hit
+ return data
+ # Read whatever we've got -- may be empty
+ data = self.mm[self.pos:self.mm.size()]
+ self.pos += len(data)
+ return data
+
+ def next(self):
+ hit = self.mm.find("\n", self.pos) + 1
+ if hit:
+ data = self.mm[self.pos:hit]
+ self.pos = hit
+ return data
+ # Remap the file just in case
+ size = os.fstat(self.fd).st_size
+ self.mm = mmap.mmap(self.fd, size, access=self.access)
+ hit = self.mm.find("\n", self.pos) + 1
+ if hit:
+ # Got a whole line after remapping
+ data = self.mm[self.pos:hit]
+ self.pos = hit
+ return data
+ # Read whatever we've got -- may be empty
+ data = self.mm[self.pos:self.mm.size()]
+ if not data:
+ raise StopIteration
+ self.pos += len(data)
+ return data
+
+ def readlines(self, sizehint=0):
+ return list(iter(self.readline, ""))
+
+ def write(self, data):
+ end = self.pos + len(data)
+ try:
+ self.mm[self.pos:end] = data
+ # This can raise IndexError on Windows, ValueError on Unix
+ except (IndexError, ValueError):
+ # XXX On Unix, this resize() call doesn't work
+ self.mm.resize(end)
+ self.mm[self.pos:end] = data
+ self.pos = end
+
+ def writelines(self, lines):
+ filter(self.write, lines)
+
+class DiskFile(object):
+
+ """Standard I/O basis stream using os.open/close/read/write/lseek"""
+ modes = {
+ 'r' : os.O_RDONLY,
+ 'rb' : os.O_RDONLY,
+ 'rU' : os.O_RDONLY,
+ 'U' : os.O_RDONLY,
+ 'w' : os.O_WRONLY,
+ 'wb' : os.O_WRONLY,
+ 'a' : os.O_WRONLY | os.O_CREAT | os.O_EXCL,
+ 'ab' : os.O_WRONLY | os.O_CREAT | os.O_EXCL,
+ 'r+' : os.O_RDWR,
+ 'rb+': os.O_RDWR,
+ 'r+b': os.O_RDWR,
+ 'w+' : os.O_RDWR | os.O_CREAT,
+ 'wb+': os.O_RDWR | os.O_CREAT,
+ 'w+b': os.O_RDWR | os.O_CREAT,
+ 'a+' : os.O_RDWR | os.O_CREAT | os.O_EXCL,
+ 'ab+': os.O_RDWR | os.O_CREAT | os.O_EXCL,
+ 'a+b': os.O_RDWR | os.O_CREAT | os.O_EXCL,
+ }
+ def __init__(self, filename, mode="r"):
+ self.filename = filename
+ self.mode = mode
+ try:
+ flag = DiskFile.modes[mode]
+ except KeyError:
+ raise ValueError, "mode should be 'r', 'r+', 'w', 'w+' or 'a+'"
+
+ if hasattr(os, "O_BINARY"):
+ flag |= os.O_BINARY
+ try:
+ self.fd = os.open(filename, flag)
+ except OSError:
+ # Opening in mode 'a' or 'a+' and file already exists
+ flag = flag & (os.O_RDWR | os.O_BINARY)
+ self.fd = os.open(filename, flag)
+ if mode[0] == 'a':
+ os.lseek(self.fd, 0, 2) # Move to end of file
+
+ def seek(self, offset, whence=0):
+ os.lseek(self.fd, offset, whence)
+
+ def tell(self):
+ return os.lseek(self.fd, 0, 1)
+
+ def read(self, n):
+ return os.read(self.fd, n)
+
+ def write(self, data):
+ while data:
+ n = os.write(self.fd, data)
+ data = data[n:]
+
+ def close(self):
+ fd = self.fd
+ if fd is not None:
+ self.fd = None
+ os.close(fd)
+
+ def __del__(self):
+ try:
+ self.close()
+ except:
+ pass
+
+class TextInputFilter(object):
+
+ """Filtering input stream for universal newline translation."""
+
+ def __init__(self, base):
+ self.base = base # must implement read, may implement tell, seek
+ self.atcr = False # Set when last char read was \r
+ self.buf = "" # Optional one-character read-ahead buffer
+ self.close = base.close
+ self.CR = False
+ self.NL = False
+ self.CRLF = False
+
+ def __getattr__(self, name):
+ if name == 'newlines':
+ foundchars = self.CR * 1 + self.NL * 2 + self.CRLF * 4
+ if not foundchars:
+ return None
+ if foundchars in [1, 2, 4]:
+ if self.CR:
+ return '\r'
+ elif self.NL:
+ return '\n'
+ else:
+ return '\r\n'
+ else:
+ result = []
+ if self.CR:
+ result.append('\r')
+ if self.NL:
+ result.append('\n')
+ if self.CRLF:
+ result.append('\r\n')
+ return tuple(result)
+
+ def read(self, n):
+ """Read up to n bytes."""
+ if n <= 0:
+ return ""
+ if self.buf:
+ assert not self.atcr
+ data = self.buf
+ self.buf = ""
+ return data
+ data = self.base.read(n)
+
+ # The following whole ugly mess is because we need to keep track of
+ # exactly which line separators we have seen for self.newlines,
+ # grumble, grumble. This has an interesting corner-case.
+ #
+ # Consider a file consisting of exactly one line ending with '\r'.
+ # The first time you read(), you will not know whether it is a
+ # CR separator or half of a CRLF separator. Neither will be marked
+ # as seen, since you are waiting for your next read to determine
+ # what you have seen. But there's no more to read ...
+
+ if self.atcr:
+ if data.startswith("\n"):
+ data = data[1:]
+ self.CRLF = True
+ if not data:
+ data = self.base.read(n)
+ else:
+ self.CR = True
+ self.atcr = False
+
+ for i in range(len(data)):
+ if data[i] == '\n':
+ if i > 0 and data[i-1] == '\r':
+ self.CRLF = True
+ else:
+ self.NL = True
+ elif data[i] == '\r':
+ if i < len(data)-1 and data[i+1] != '\n':
+ self.CR = True
+
+ if "\r" in data:
+ self.atcr = data.endswith("\r")
+ data = data.replace("\r\n", "\n").replace("\r", "\n")
+
+ return data
+
+ def seek(self, offset, whence=0):
+ """Seeks based on knowledge that does not come from a tell()
+ may go to the wrong place, since the number of
+ characters seen may not match the number of characters
+ that are actually in the file (where \r\n is the
+ line separator). Arithmetics on the result
+ of a tell() that moves beyond a newline character may in the
+ same way give the wrong result.
+ """
+ self.base.seek(offset, whence)
+ self.atcr = False
+ self.buf = ""
+
+ def tell(self):
+ pos = self.base.tell()
+ if self.atcr:
+ # Must read the next byte to see if it's \n,
+ # because then we must report the next position.
+ assert not self.buf
+ self.buf = self.base.read(1)
+ pos += 1
+ self.atcr = False
+ if self.buf == "\n":
+ self.buf = ""
+ return pos - len(self.buf)
+
+class TextOutputFilter(object):
+
+ """Filtering output stream for universal newline translation."""
+
+ def __init__(self, base, linesep=os.linesep):
+ assert linesep in ["\n", "\r\n", "\r"]
+ self.base = base # must implement write, may implement seek, tell
+ self.linesep = linesep
+ self.close = base.close
+
+ def write(self, data):
+ if self.linesep is not "\n" and "\n" in data:
+ data = data.replace("\n", self.linesep)
+ self.base.write(data)
+
+ def seek(self, offset, whence=0):
+ self.base.seek(offset, whence)
+
+ def tell(self):
+ return self.base.tell()
+
+class DecodingInputFilter(object):
+
+ """Filtering input stream that decodes an encoded file."""
+
+ def __init__(self, base, encoding="utf8", errors="strict"):
+ self.base = base
+ self.encoding = encoding
+ self.errors = errors
+ self.tell = base.tell
+ self.seek = base.seek
+ self.close = base.close
+
+ def read(self, n):
+ """Read *approximately* n bytes, then decode them.
+
+ Under extreme circumstances,
+ the return length could be longer than n!
+
+ Always return a unicode string.
+
+ This does *not* translate newlines;
+ you can stack TextInputFilter.
+ """
+ data = self.base.read(n)
+ try:
+ return data.decode(self.encoding, self.errors)
+ except ValueError:
+ # XXX Sigh. decode() doesn't handle incomplete strings well.
+ # Use the retry strategy from codecs.StreamReader.
+ for i in range(9):
+ more = self.base.read(1)
+ if not more:
+ raise
+ data += more
+ try:
+ return data.decode(self.encoding, self.errors)
+ except ValueError:
+ pass
+ raise
+
+class EncodingOutputFilter(object):
+
+ """Filtering output stream that writes to an encoded file."""
+
+ def __init__(self, base, encoding="utf8", errors="strict"):
+ self.base = base
+ self.encoding = encoding
+ self.errors = errors
+ self.tell = base.tell
+ self.seek = base.seek
+ self.close = base.close
+
+ def write(self, chars):
+ if isinstance(chars, str):
+ chars = unicode(chars) # Fail if it's not ASCII
+ self.base.write(chars.encode(self.encoding, self.errors))
Added: pypy/trunk/src/pypy/appspace/test/test_file.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/test/test_file.py Sat Nov 20 19:52:21 2004
@@ -0,0 +1,20 @@
+import autopath
+from pypy.appspace import _file
+import unittest
+
+class FileTestCase(unittest.TestCase):
+ def setUp(self):
+ self.fd = _file.file_('test_file.py', 'r')
+
+ def tearDown(self):
+ self.fd.close()
+
+ def test_case_1(self):
+ self.assertEquals(self.fd.tell(), 0)
+
+def test_main():
+ unittest.main()
+
+
+if __name__ == "__main__":
+ test_main()
Added: pypy/trunk/src/pypy/appspace/test/test_sio.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/test/test_sio.py Sat Nov 20 19:52:21 2004
@@ -0,0 +1,673 @@
+"""Unit tests for sio (new standard I/O)."""
+
+import os
+import time
+import tempfile
+import unittest
+
+import sio
+
+class TestSource(object):
+
+ def __init__(self, packets):
+ for x in packets:
+ assert x
+ self.orig_packets = list(packets)
+ self.packets = list(packets)
+ self.pos = 0
+
+ def tell(self):
+ return self.pos
+
+ def seek(self, offset, whence=0):
+ if whence == 1:
+ offset += self.pos
+ elif whence == 2:
+ for packet in self.orig_packets:
+ offset += len(packet)
+ else:
+ assert whence == 0
+ self.packets = list(self.orig_packets)
+ self.pos = 0
+ while self.pos < offset:
+ data = self.read(offset - self.pos)
+ if not data:
+ break
+ assert self.pos == offset
+
+ def read(self, n):
+ try:
+ data = self.packets.pop(0)
+ except IndexError:
+ return ""
+ if len(data) > n:
+ data, rest = data[:n], data[n:]
+ self.packets.insert(0, rest)
+ self.pos += len(data)
+ return data
+
+ def close(self):
+ pass
+
+class TestReader(object):
+
+ def __init__(self, packets):
+ for x in packets:
+ assert x
+ self.orig_packets = list(packets)
+ self.packets = list(packets)
+ self.pos = 0
+
+ def tell(self):
+ return self.pos
+
+ def seek(self, offset, whence=0):
+ if whence == 1:
+ offset += self.pos
+ elif whence == 2:
+ for packet in self.orig_packets:
+ offset += len(packet)
+ else:
+ assert whence == 0
+ self.packets = list(self.orig_packets)
+ self.pos = 0
+ while self.pos < offset:
+ data = self.read(offset - self.pos)
+ if not data:
+ break
+ assert self.pos == offset
+
+ def read(self, n):
+ try:
+ data = self.packets.pop(0)
+ except IndexError:
+ return ""
+ if len(data) > n:
+ data, rest = data[:n], data[n:]
+ self.packets.insert(0, rest)
+ self.pos += len(data)
+ return data
+
+ def close(self):
+ pass
+
+class TestWriter(object):
+
+ def __init__(self):
+ self.buf = ""
+ self.pos = 0
+
+ def write(self, data):
+ if self.pos >= len(self.buf):
+ self.buf += "\0" * (self.pos - len(self.buf)) + data
+ self.pos = len(self.buf)
+ else:
+ self.buf = (self.buf[:self.pos] + data +
+ self.buf[self.pos + len(data):])
+ self.pos += len(data)
+
+ def tell(self):
+ return self.pos
+
+ def seek(self, offset, whence=0):
+ if whence == 0:
+ pass
+ elif whence == 1:
+ offset += self.pos
+ elif whence == 2:
+ offset += len(self.buf)
+ else:
+ raise ValueError, "whence should be 0, 1 or 2"
+ if offset < 0:
+ offset = 0
+ self.pos = offset
+
+ def close(self):
+ pass
+
+class BufferingInputStreamTests(unittest.TestCase):
+
+ packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
+ lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
+
+ def makeStream(self, tell=False, seek=False, bufsize=None):
+ base = TestSource(self.packets)
+ if not tell:
+ base.tell = None
+ if not seek:
+ base.seek = None
+ return sio.BufferingInputStream(base, bufsize)
+
+ def test_readline(self):
+ file = self.makeStream()
+ self.assertEqual(list(iter(file.readline, "")), self.lines)
+
+ def test_readlines(self):
+ # This also tests next() and __iter__()
+ file = self.makeStream()
+ self.assertEqual(file.readlines(), self.lines)
+
+ def test_readlines_small_bufsize(self):
+ file = self.makeStream(bufsize=1)
+ self.assertEqual(list(file), self.lines)
+
+ def test_readall(self):
+ file = self.makeStream()
+ self.assertEqual(file.readall(), "".join(self.lines))
+
+ def test_readall_small_bufsize(self):
+ file = self.makeStream(bufsize=1)
+ self.assertEqual(file.readall(), "".join(self.lines))
+
+ def test_readall_after_readline(self):
+ file = self.makeStream()
+ self.assertEqual(file.readline(), self.lines[0])
+ self.assertEqual(file.readline(), self.lines[1])
+ self.assertEqual(file.readall(), "".join(self.lines[2:]))
+
+ def test_read_1_after_readline(self):
+ file = self.makeStream()
+ self.assertEqual(file.readline(), "ab\n")
+ self.assertEqual(file.readline(), "def\n")
+ blocks = []
+ while 1:
+ block = file.read(1)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(file.read(0), "")
+ self.assertEqual(blocks, list("".join(self.lines)[7:]))
+
+ def test_read_1(self):
+ file = self.makeStream()
+ blocks = []
+ while 1:
+ block = file.read(1)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(file.read(0), "")
+ self.assertEqual(blocks, list("".join(self.lines)))
+
+ def test_read_2(self):
+ file = self.makeStream()
+ blocks = []
+ while 1:
+ block = file.read(2)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(file.read(0), "")
+ self.assertEqual(blocks, ["ab", "\nd", "ef", "\nx", "y\n", "pq",
+ "\nu", "vw", "x"])
+
+ def test_read_4(self):
+ file = self.makeStream()
+ blocks = []
+ while 1:
+ block = file.read(4)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(file.read(0), "")
+ self.assertEqual(blocks, ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"])
+
+ def test_read_4_after_readline(self):
+ file = self.makeStream()
+ self.assertEqual(file.readline(), "ab\n")
+ self.assertEqual(file.readline(), "def\n")
+ blocks = [file.read(4)]
+ while 1:
+ block = file.read(4)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(file.read(0), "")
+ self.assertEqual(blocks, ["xy\np", "q\nuv", "wx"])
+
+ def test_read_4_small_bufsize(self):
+ file = self.makeStream(bufsize=1)
+ blocks = []
+ while 1:
+ block = file.read(4)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(blocks, ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"])
+
+ def test_tell_1(self):
+ file = self.makeStream(tell=True)
+ pos = 0
+ while 1:
+ self.assertEqual(file.tell(), pos)
+ n = len(file.read(1))
+ if not n:
+ break
+ pos += n
+
+ def test_tell_1_after_readline(self):
+ file = self.makeStream(tell=True)
+ pos = 0
+ pos += len(file.readline())
+ self.assertEqual(file.tell(), pos)
+ pos += len(file.readline())
+ self.assertEqual(file.tell(), pos)
+ while 1:
+ self.assertEqual(file.tell(), pos)
+ n = len(file.read(1))
+ if not n:
+ break
+ pos += n
+
+ def test_tell_2(self):
+ file = self.makeStream(tell=True)
+ pos = 0
+ while 1:
+ self.assertEqual(file.tell(), pos)
+ n = len(file.read(2))
+ if not n:
+ break
+ pos += n
+
+ def test_tell_4(self):
+ file = self.makeStream(tell=True)
+ pos = 0
+ while 1:
+ self.assertEqual(file.tell(), pos)
+ n = len(file.read(4))
+ if not n:
+ break
+ pos += n
+
+ def test_tell_readline(self):
+ file = self.makeStream(tell=True)
+ pos = 0
+ while 1:
+ self.assertEqual(file.tell(), pos)
+ n = len(file.readline())
+ if not n:
+ break
+ pos += n
+
+ def test_seek(self):
+ file = self.makeStream(tell=True, seek=True)
+ all = file.readall()
+ end = len(all)
+ for readto in range(0, end+1):
+ for seekto in range(0, end+1):
+ for whence in 0, 1, 2:
+ file.seek(0)
+ self.assertEqual(file.tell(), 0)
+ head = file.read(readto)
+ self.assertEqual(head, all[:readto])
+ if whence == 1:
+ offset = seekto - readto
+ elif whence == 2:
+ offset = seekto - end
+ else:
+ offset = seekto
+ file.seek(offset, whence)
+ here = file.tell()
+ self.assertEqual(here, seekto)
+ rest = file.readall()
+ self.assertEqual(rest, all[seekto:])
+
+ def test_seek_noseek(self):
+ file = self.makeStream()
+ all = file.readall()
+ end = len(all)
+ for readto in range(0, end+1):
+ for seekto in range(readto, end+1):
+ for whence in 1, 2:
+ file = self.makeStream()
+ head = file.read(readto)
+ self.assertEqual(head, all[:readto])
+ if whence == 1:
+ offset = seekto - readto
+ elif whence == 2:
+ offset = seekto - end
+ file.seek(offset, whence)
+ rest = file.readall()
+ self.assertEqual(rest, all[seekto:])
+
+class BufferingOutputStreamTests(unittest.TestCase):
+
+ def test_write(self):
+ base = TestWriter()
+ filter = sio.BufferingOutputStream(base, 4)
+ filter.write("123")
+ self.assertEqual(base.buf, "")
+ self.assertEquals(filter.tell(), 3)
+ filter.write("456")
+ self.assertEqual(base.buf, "1234")
+ filter.write("789ABCDEF")
+ self.assertEqual(base.buf, "123456789ABC")
+ filter.write("0123")
+ self.assertEqual(base.buf, "123456789ABCDEF0")
+ self.assertEquals(filter.tell(), 19)
+ filter.close()
+ self.assertEqual(base.buf, "123456789ABCDEF0123")
+
+ def test_write_seek(self):
+ base = TestWriter()
+ filter = sio.BufferingOutputStream(base, 4)
+ filter.write("x"*6)
+ filter.seek(3)
+ filter.write("y"*2)
+ filter.close()
+ self.assertEqual(base.buf, "x"*3 + "y"*2 + "x"*1)
+
+class LineBufferingOutputStreamTests(unittest.TestCase):
+
+ def test_write(self):
+ base = TestWriter()
+ filter = sio.LineBufferingOutputStream(base)
+ filter.bufsize = 4 # More handy for testing than the default
+ filter.write("123")
+ self.assertEqual(base.buf, "")
+ self.assertEquals(filter.tell(), 3)
+ filter.write("456")
+ self.assertEqual(base.buf, "1234")
+ filter.write("789ABCDEF\n")
+ self.assertEqual(base.buf, "123456789ABCDEF\n")
+ filter.write("0123")
+ self.assertEqual(base.buf, "123456789ABCDEF\n0123")
+ self.assertEquals(filter.tell(), 20)
+ filter.close()
+ self.assertEqual(base.buf, "123456789ABCDEF\n0123")
+
+ def xtest_write_seek(self):
+ base = TestWriter()
+ filter = sio.BufferingOutputStream(base, 4)
+ filter.write("x"*6)
+ filter.seek(3)
+ filter.write("y"*2)
+ filter.close()
+ self.assertEqual(base.buf, "x"*3 + "y"*2 + "x"*1)
+
+class CRLFFilterTests(unittest.TestCase):
+
+ def test_filter(self):
+ packets = ["abc\ndef\rghi\r\nxyz\r", "123\r", "\n456"]
+ expected = ["abc\ndef\nghi\nxyz\n", "123\n", "456"]
+ crlf = sio.CRLFFilter(TestSource(packets))
+ blocks = []
+ while 1:
+ block = crlf.read(100)
+ if not block:
+ break
+ blocks.append(block)
+ self.assertEqual(blocks, expected)
+
+class MMapFileTests(BufferingInputStreamTests):
+
+ tfn = None
+
+ def tearDown(self):
+ tfn = self.tfn
+ if tfn:
+ self.tfn = None
+ try:
+ os.remove(tfn)
+ except os.error, msg:
+ print "can't remove %s: %s" % (tfn, msg)
+
+ def makeStream(self, tell=None, seek=None, bufsize=None, mode="r"):
+ self.tfn = tempfile.mktemp()
+ f = open(self.tfn, "wb")
+ f.writelines(self.packets)
+ f.close()
+ return sio.MMapFile(self.tfn, mode)
+
+ def test_write(self):
+ if os.name == "posix":
+ return # write() does't work on Unix :-(
+ file = self.makeStream(mode="w")
+ file.write("BooHoo\n")
+ file.write("Barf\n")
+ file.writelines(["a\n", "b\n", "c\n"])
+ self.assertEqual(file.tell(), len("BooHoo\nBarf\na\nb\nc\n"))
+ file.seek(0)
+ self.assertEqual(file.read(), "BooHoo\nBarf\na\nb\nc\n")
+ file.seek(0)
+ self.assertEqual(file.readlines(),
+ ["BooHoo\n", "Barf\n", "a\n", "b\n", "c\n"])
+ self.assertEqual(file.tell(), len("BooHoo\nBarf\na\nb\nc\n"))
+
+class TextInputFilterTests(unittest.TestCase):
+
+ packets = [
+ "foo\r",
+ "bar\r",
+ "\nfoo\r\n",
+ "abc\ndef\rghi\r\nxyz",
+ "\nuvw\npqr\r",
+ "\n",
+ "abc\n",
+ ]
+ expected = [
+ ("foo\n", 4),
+ ("bar\n", 9),
+ ("foo\n", 14),
+ ("abc\ndef\nghi\nxyz", 30),
+ ("\nuvw\npqr\n", 40),
+ ("abc\n", 44),
+ ("", 44),
+ ("", 44),
+ ]
+
+ expected_with_tell = [
+ ("foo\n", 4),
+ ("b", 5),
+ ("ar\n", 9),
+ ("foo\n", 14),
+ ("abc\ndef\nghi\nxyz", 30),
+ ("\nuvw\npqr\n", 40),
+ ("abc\n", 44),
+ ("", 44),
+ ("", 44),
+ ]
+
+ expected_newlines = [
+ (["abcd"], [None]),
+ (["abcd\n"], ["\n"]),
+ (["abcd\r\n"],["\r\n"]),
+ (["abcd\r"],[None]), # wrong, but requires precognition to fix
+ (["abcd\r", "\nefgh"], [None, "\r\n"]),
+ (["abcd", "\nefg\r", "hij", "k\r\n"], [None, "\n", ("\r", "\n"),
+ ("\r", "\n", "\r\n")]),
+ (["abcd", "\refg\r", "\nhij", "k\n"], [None, "\r", ("\r", "\r\n"),
+ ("\r", "\n", "\r\n")])
+ ]
+
+ def test_read(self):
+ base = TestReader(self.packets)
+ filter = sio.TextInputFilter(base)
+ for data, pos in self.expected:
+ self.assertEqual(filter.read(100), data)
+
+ def test_read_tell(self):
+ base = TestReader(self.packets)
+ filter = sio.TextInputFilter(base)
+ for data, pos in self.expected_with_tell:
+ self.assertEqual(filter.read(100), data)
+ self.assertEqual(filter.tell(), pos)
+ self.assertEqual(filter.tell(), pos) # Repeat the tell() !
+
+ def test_seek(self):
+ base = TestReader(self.packets)
+ filter = sio.TextInputFilter(base)
+ sofar = ""
+ pairs = []
+ while True:
+ pairs.append((sofar, filter.tell()))
+ c = filter.read(1)
+ if not c:
+ break
+ self.assertEqual(len(c), 1)
+ sofar += c
+ all = sofar
+ for i in range(len(pairs)):
+ sofar, pos = pairs[i]
+ filter.seek(pos)
+ self.assertEqual(filter.tell(), pos)
+ self.assertEqual(filter.tell(), pos)
+ bufs = [sofar]
+ while True:
+ data = filter.read(100)
+ if not data:
+ self.assertEqual(filter.read(100), "")
+ break
+ bufs.append(data)
+ self.assertEqual("".join(bufs), all)
+
+ def test_newlines_attribute(self):
+
+ for packets, expected in self.expected_newlines:
+ base = TestReader(packets)
+ filter = sio.TextInputFilter(base)
+ for e in expected:
+ filter.read(100)
+ self.assertEquals(filter.newlines, e)
+
+class TextOutputFilterTests(unittest.TestCase):
+
+ def test_write_nl(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\n")
+ filter.write("abc")
+ filter.write("def\npqr\nuvw")
+ filter.write("\n123\n")
+ self.assertEqual(base.buf, "abcdef\npqr\nuvw\n123\n")
+
+ def test_write_cr(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\r")
+ filter.write("abc")
+ filter.write("def\npqr\nuvw")
+ filter.write("\n123\n")
+ self.assertEqual(base.buf, "abcdef\rpqr\ruvw\r123\r")
+
+ def test_write_crnl(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\r\n")
+ filter.write("abc")
+ filter.write("def\npqr\nuvw")
+ filter.write("\n123\n")
+ self.assertEqual(base.buf, "abcdef\r\npqr\r\nuvw\r\n123\r\n")
+
+ def test_write_tell_nl(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\n")
+ filter.write("xxx")
+ self.assertEqual(filter.tell(), 3)
+ filter.write("\nabc\n")
+ self.assertEqual(filter.tell(), 8)
+
+ def test_write_tell_cr(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\r")
+ filter.write("xxx")
+ self.assertEqual(filter.tell(), 3)
+ filter.write("\nabc\n")
+ self.assertEqual(filter.tell(), 8)
+
+ def test_write_tell_crnl(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\r\n")
+ filter.write("xxx")
+ self.assertEqual(filter.tell(), 3)
+ filter.write("\nabc\n")
+ self.assertEqual(filter.tell(), 10)
+
+ def test_write_seek(self):
+ base = TestWriter()
+ filter = sio.TextOutputFilter(base, linesep="\n")
+ filter.write("x"*100)
+ filter.seek(50)
+ filter.write("y"*10)
+ self.assertEqual(base.buf, "x"*50 + "y"*10 + "x"*40)
+
+class DecodingInputFilterTests(unittest.TestCase):
+
+ def test_read(self):
+ chars = u"abc\xff\u1234\u4321\x80xyz"
+ data = chars.encode("utf8")
+ base = TestReader([data])
+ filter = sio.DecodingInputFilter(base)
+ bufs = []
+ for n in range(1, 11):
+ while 1:
+ c = filter.read(n)
+ self.assertEqual(type(c), unicode)
+ if not c:
+ break
+ bufs.append(c)
+ self.assertEqual(u"".join(bufs), chars)
+
+class EncodingOutputFilterTests(unittest.TestCase):
+
+ def test_write(self):
+ chars = u"abc\xff\u1234\u4321\x80xyz"
+ data = chars.encode("utf8")
+ for n in range(1, 11):
+ base = TestWriter()
+ filter = sio.EncodingOutputFilter(base)
+ pos = 0
+ while 1:
+ c = chars[pos:pos+n]
+ if not c:
+ break
+ pos += len(c)
+ filter.write(c)
+ self.assertEqual(base.buf, data)
+
+# Speed test
+
+FN = "BIG"
+
+def timeit(fn=FN, opener=sio.MMapFile):
+ f = opener(fn, "r")
+ lines = bytes = 0
+ t0 = time.clock()
+ for line in f:
+ lines += 1
+ bytes += len(line)
+ t1 = time.clock()
+ print "%d lines (%d bytes) in %.3f seconds for %s" % (
+ lines, bytes, t1-t0, opener.__name__)
+
+def speed_main():
+ def diskopen(fn, mode):
+ base = sio.DiskFile(fn, mode)
+ return sio.BufferingInputStream(base)
+ timeit(opener=diskopen)
+ timeit(opener=sio.MMapFile)
+ timeit(opener=open)
+
+# Functional test
+
+def functional_main():
+ f = sio.DiskFile("sio.py")
+ f = sio.DecodingInputFilter(f)
+ f = sio.TextInputFilter(f)
+ f = sio.BufferingInputStream(f)
+ for i in range(10):
+ print repr(f.readline())
+
+def makeSuite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(BufferingInputStreamTests))
+ suite.addTest(unittest.makeSuite(BufferingOutputStreamTests))
+ suite.addTest(unittest.makeSuite(LineBufferingOutputStreamTests))
+ suite.addTest(unittest.makeSuite(CRLFFilterTests))
+ suite.addTest(unittest.makeSuite(MMapFileTests))
+ suite.addTest(unittest.makeSuite(TextInputFilterTests))
+ suite.addTest(unittest.makeSuite(TextOutputFilterTests))
+ suite.addTest(unittest.makeSuite(DecodingInputFilterTests))
+ suite.addTest(unittest.makeSuite(EncodingOutputFilterTests))
+
+ return suite
+
+if __name__ == "__main__":
+ unittest.TextTestRunner().run(makeSuite())
More information about the Pypy-commit
mailing list