[pypy-svn] r7530 - in pypy/trunk/src/pypy/appspace: . test

jacob at codespeak.net jacob at codespeak.net
Sat Nov 20 19:52:22 CET 2004


Author: jacob
Date: Sat Nov 20 19:52:21 2004
New Revision: 7530

Added:
   pypy/trunk/src/pypy/appspace/_file.py
   pypy/trunk/src/pypy/appspace/sio.py
   pypy/trunk/src/pypy/appspace/test/test_file.py
   pypy/trunk/src/pypy/appspace/test/test_sio.py
Log:
Added builtin file with support. Reading and writing work both buffered and unbuffered. Writing works linebuffered as well. Universal newline support works. Buffered combined read/write does not work yet. Linebuffered read does not work yet.

Added: pypy/trunk/src/pypy/appspace/_file.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/_file.py	Sat Nov 20 19:52:21 2004
@@ -0,0 +1,51 @@
+import sio
+
+class file_(object):
+    """An implementation of file objects in Python. it relies on Guido's
+       sio.py implementation.
+    """
+    def __init__(self, filename, mode='r', bufsize=None):
+        self.reading = False
+        self.writing = False
+        
+        if not mode:
+            raise IOError('invalid mode :  ')
+        if mode[0] not in ['r', 'w', 'a', 'U']:
+            raise IOError('invalid mode : %s' % mode)
+        else:
+            if mode[0] in ['r', 'U']:
+                self.reading = True
+            else:
+                self.writing = True
+        try:
+            if mode[1] == 'b':
+                plus = mode[2]
+            else:
+                plus = mode[1]
+            if plus == '+':
+                self.reading = self.writing = True
+        except IndexError:
+            pass
+        
+        self.fd = sio.DiskFile(filename, mode)
+        if mode in ['U', 'rU']:
+            # Wants universal newlines
+            self.fd = sio.TextInputFilter(self.fd)
+        if bufsize < 0:
+            bufsize = None
+        if not self.writing and (bufsize is None or bufsize > 0):
+            self.fd = sio.BufferingInputStream(self.fd, bufsize)
+        if not self.reading:
+            if bufsize is None or bufsize > 1:
+                self.fd = sio.BufferingOutputStream(self.fd, bufsize)
+            elif bufsize == 1:
+                self.fd = sio.LineBufferingOutputStream(self.fd)
+        return self.fd
+
+    def __getattr__(self, name):
+        """
+        Delegate all other methods to the underlying file object.
+        """
+        return getattr(self.fd, name)
+
+    

Added: pypy/trunk/src/pypy/appspace/sio.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/sio.py	Sat Nov 20 19:52:21 2004
@@ -0,0 +1,790 @@
+"""New standard I/O library.
+
+This code is still very young and experimental!
+
+There are fairly complete unit tests in test_sio.py.
+
+The design is simple:
+
+- A raw stream supports read(n), write(s), seek(offset, whence=0) and
+  tell().  This is generally unbuffered.  Raw streams may support
+  Unicode.
+
+- A basis stream provides the raw stream API and builds on a much more
+  low-level API, e.g. the os, mmap or socket modules.
+
+- A filtering stream is raw stream built on top of another raw stream.
+  There are filtering streams for universal newline translation and
+  for unicode translation.
+
+- A buffering stream supports the full classic Python I/O API:
+  read(n=-1), readline(), readlines(sizehint=0), tell(), seek(offset,
+  whence=0), write(s), writelines(lst), as well as __iter__() and
+  next().  (There's also readall() but that's a synonym for read()
+  without arguments.)  This is a superset of the raw stream API.  I
+  haven't thought about fileno() and isatty() yet, nor about
+  truncate() or the various attributes like name and mode.  Also,
+  close() is not implemented right.  We really need only one buffering
+  stream implementation, which is a filtering stream.
+
+You typically take a basis stream, place zero or more filtering
+streams on top of it, and then top it off with a buffering stream.
+
+"""
+
+import os
+import mmap
+
+class BufferingInputStream(object):
+
+    """Standard buffering input stream.
+
+    This is typically the top of the stack.
+    """
+
+    bigsize = 2**19 # Half a Meg
+    bufsize = 2**13 # 8 K
+
+    def __init__(self, base, bufsize=None):
+        self.do_read = getattr(base, "read", None)
+                       # function to fill buffer some more
+        self.do_tell = getattr(base, "tell", None)
+                       # None, or return a byte offset 
+        self.do_seek = getattr(base, "seek", None)
+                       # None, or seek to a byte offset
+        self.close = base.close
+
+        if bufsize is None:     # Get default from the class
+            bufsize = self.bufsize
+        self.bufsize = bufsize  # buffer size (hint only)
+        self.lines = []         # ready-made lines (sans "\n")
+        self.buf = ""           # raw data (may contain "\n")
+        # Invariant: readahead == "\n".join(self.lines + [self.buf])
+        # self.lines contains no "\n"
+        # self.buf may contain "\n"
+
+    def tell(self):
+        bytes = self.do_tell()  # This may fail
+        offset = len(self.buf)
+        for line in self.lines:
+            offset += len(line) + 1
+        assert bytes >= offset, (locals(), self.__dict__)
+        return bytes - offset
+
+    def seek(self, offset, whence=0):
+        # This may fail on the do_seek() or do_tell() call.
+        # But it won't call either on a relative forward seek.
+        # Nor on a seek to the very end.
+        if whence == 0 or (whence == 2 and self.do_seek is not None):
+            self.do_seek(offset, whence)
+            self.lines = []
+            self.buf = ""
+            return
+        if whence == 2:
+            # Skip relative to EOF by reading and saving only just as
+            # much as needed
+            assert self.do_seek is None
+            data = "\n".join(self.lines + [self.buf])
+            total = len(data)
+            buffers = [data]
+            self.lines = []
+            self.buf = ""
+            while 1:
+                data = self.do_read(self.bufsize)
+                if not data:
+                    break
+                buffers.append(data)
+                total += len(data)
+                while buffers and total >= len(buffers[0]) - offset:
+                    total -= len(buffers[0])
+                    del buffers[0]
+            cutoff = total + offset
+            if cutoff < 0:
+                raise TypeError, "cannot seek back"
+            if buffers:
+                buffers[0] = buffers[0][cutoff:]
+            self.buf = "".join(buffers)
+            self.lines = []
+            return
+        if whence == 1:
+            if offset < 0:
+                self.do_seek(self.tell() + offset, 0)
+                self.lines = []
+                self.buf = ""
+                return
+            while self.lines:
+                line = self.lines[0]
+                if offset <= len(line):
+                    self.lines[0] = line[offset:]
+                    return
+                offset -= len(self.lines[0]) - 1
+                del self.lines[0]
+            assert not self.lines
+            if offset <= len(self.buf):
+                self.buf = self.buf[offset:]
+                return
+            offset -= len(self.buf)
+            self.buf = ""
+            if self.do_seek is None:
+                self.read(offset)
+            else:
+                self.do_seek(offset, 1)
+            return
+        raise ValueError, "whence should be 0, 1 or 2"
+
+    def readall(self):
+        self.lines.append(self.buf)
+        more = ["\n".join(self.lines)]
+        self.lines = []
+        self.buf = ""
+        bufsize = self.bufsize
+        while 1:
+            data = self.do_read(bufsize)
+            if not data:
+                break
+            more.append(data)
+            bufsize = max(bufsize*2, self.bigsize)
+        return "".join(more)
+
+    def read(self, n=-1):
+        if n < 0:
+            return self.readall()
+
+        if self.lines:
+            # See if this can be satisfied from self.lines[0]
+            line = self.lines[0]
+            if len(line) >= n:
+                self.lines[0] = line[n:]
+                return line[:n]
+
+            # See if this can be satisfied *without exhausting* self.lines
+            k = 0
+            i = 0
+            for line in self.lines:
+                k += len(line)
+                if k >= n:
+                    lines = self.lines[:i]
+                    data = self.lines[i]
+                    cutoff = len(data) - (k-n)
+                    lines.append(data[:cutoff])
+                    self.lines[:i+1] = [data[cutoff:]]
+                    return "\n".join(lines)
+                k += 1
+                i += 1
+
+            # See if this can be satisfied from self.lines plus self.buf
+            if k + len(self.buf) >= n:
+                lines = self.lines
+                self.lines = []
+                cutoff = n - k
+                lines.append(self.buf[:cutoff])
+                self.buf = self.buf[cutoff:]
+                return "\n".join(lines)
+
+        else:
+            # See if this can be satisfied from self.buf
+            data = self.buf
+            k = len(data)
+            if k >= n:
+                cutoff = len(data) - (k-n)
+                self.buf = data[cutoff:]
+                return data[:cutoff]
+
+        lines = self.lines
+        self.lines = []
+        lines.append(self.buf)
+        self.buf = ""
+        data = "\n".join(lines)
+        more = [data]
+        k = len(data)
+        while k < n:
+            data = self.do_read(max(self.bufsize, n-k))
+            k += len(data)
+            more.append(data)
+            if not data:
+                break
+        cutoff = len(data) - (k-n)
+        self.buf = data[cutoff:]
+        more[-1] = data[:cutoff]
+        return "".join(more)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self.lines:
+            return self.lines.pop(0) + "\n"
+
+        # This block is needed because read() can leave self.buf
+        # containing newlines
+        self.lines = self.buf.split("\n")
+        self.buf = self.lines.pop()
+        if self.lines:
+            return self.lines.pop(0) + "\n"
+
+        buf = self.buf and [self.buf] or []
+        while 1:
+            self.buf = self.do_read(self.bufsize)
+            self.lines = self.buf.split("\n")
+            self.buf = self.lines.pop()
+            if self.lines:
+                buf.append(self.lines.pop(0))
+                buf.append("\n")
+                break
+            if not self.buf:
+                break
+            buf.append(self.buf)
+
+        line = "".join(buf)
+        if not line:
+            raise StopIteration
+        return line
+
+    def readline(self):
+        try:
+            return self.next()
+        except StopIteration:
+            return ""
+
+    def readlines(self, sizehint=0):
+        return list(self)
+
+class BufferingOutputStream(object):
+
+    """Standard buffering output stream.
+
+    This is typically the top of the stack.
+    """
+
+    bigsize = 2**19 # Half a Meg
+    bufsize = 2**13 # 8 K
+
+    def __init__(self, base, bufsize=None):
+        self.do_write = base.write # Flush buffer
+        self.do_tell = base.tell
+             # Return a byte offset; has to exist or this __init__() will fail
+        self.do_seek = getattr(base, "seek", None)
+                       # None, or seek to a byte offset
+        self.do_close = base.close # Close file
+
+        if bufsize is None:     # Get default from the class
+            bufsize = self.bufsize
+        self.bufsize = bufsize  # buffer size (hint only)
+        self.buf = ""
+        self.tell()
+        
+    def tell(self):
+        assert self.do_tell is not None
+        if not hasattr(self, 'pos'):
+            self.pos = self.do_tell()
+
+        return self.pos
+            
+    def seek(self, offset, whence=0):
+        self.do_write(self.buf)
+        self.buf = ''
+        self.do_seek(offset, whence)
+        self.pos = self.do_tell()
+
+    def write(self, data):
+        buflen = len(self.buf)
+        datalen = len(data)
+        if  datalen + buflen < self.bufsize:
+            self.buf += data
+            self.pos += datalen
+        else:
+            self.buf += data[:self.bufsize-buflen]
+            self.pos += self.bufsize-buflen
+            self.do_write(self.buf)
+            self.buf = ''
+            self.write(data[self.bufsize-buflen:])
+
+    def close(self):
+        self.do_write(self.buf)
+        self.buf = ''
+        if self.do_close():
+            self.do_close()
+
+class LineBufferingOutputStream(BufferingOutputStream):
+
+    """Line buffering output stream.
+
+    This is typically the top of the stack.
+    """
+
+    def __init__(self, base, bufsize=None):
+        self.do_write = base.write # Flush buffer
+        self.do_tell = base.tell
+             # Return a byte offset; has to exist or this __init__() will fail
+        self.do_seek = getattr(base, "seek", None)
+                       # None, or seek to a byte offset
+        self.do_close = base.close # Close file
+
+        self.linesep = os.linesep
+        self.buf = ""           # raw data (may contain "\n")
+        self.tell()
+        
+    def tell(self):
+        assert self.do_tell is not None
+        if not hasattr(self, 'pos'):
+            self.pos = self.do_tell()
+
+        return self.pos
+            
+    def seek(self, offset, whence=0):
+        self.do_write(self.buf)
+        self.buf = ''
+        self.do_seek(offset, whence)
+        self.pos = self.do_tell()
+
+    def write(self, data):
+        all_lines = data.split(self.linesep)
+        full_lines = all_lines[:-1]
+        for line in full_lines:
+            line += self.linesep
+            buflen = len(self.buf)
+            linelen = len(line)
+            if  linelen + buflen < self.bufsize:
+                self.buf += line
+                self.pos += linelen
+                self.do_write(self.buf)
+                self.buf = ''
+            else:
+                self.buf += line[:self.bufsize-buflen]
+                self.pos += self.bufsize-buflen
+                self.do_write(self.buf)
+                self.buf = ''
+                self.write(line[self.bufsize-buflen:])
+
+        # The last part of the split data never has a terminating linesep.
+        # If the data has a terminating linesep, the last element is an
+        # empty string.
+
+        line = all_lines[-1]
+        buflen = len(self.buf)
+        linelen = len(line)
+        if  linelen + buflen < self.bufsize:
+            self.buf += line
+            self.pos += linelen
+        else:
+            self.buf += line[:self.bufsize-buflen]
+            self.pos += self.bufsize-buflen
+            self.do_write(self.buf)
+            self.buf = ''
+            self.write(line[self.bufsize-buflen:])
+        
+    def close(self):
+        self.do_write(self.buf)
+        self.buf = ''
+        if self.do_close():
+            self.do_close()
+        
+class CRLFFilter(object):
+
+    """Filtering stream for universal newlines.
+
+    TextInputFilter is more general, but this is faster when you don't
+    need tell/seek.
+    """
+
+    def __init__(self, base):
+        self.do_read = base.read
+        self.atcr = False
+        self.close = base.close
+
+    def read(self, n):
+        data = self.do_read(n)
+        if self.atcr:
+            if data.startswith("\n"):
+                data = data[1:] # Very rare case: in the middle of "\r\n"
+            self.atcr = False
+        if "\r" in data:
+            self.atcr = data.endswith("\r")     # Test this before removing \r
+            data = data.replace("\r\n", "\n")   # Catch \r\n this first
+            data = data.replace("\r", "\n")     # Remaining \r are standalone
+        return data
+
+class MMapFile(object):
+
+    """Standard I/O basis stream using mmap."""
+
+    def __init__(self, filename, mode="r"):
+        self.filename = filename
+        self.mode = mode
+        if mode == "r":
+            flag = os.O_RDONLY
+            self.access = mmap.ACCESS_READ
+        else:
+            if mode == "w":
+                flag = os.O_RDWR | os.O_CREAT
+            elif mode == "a":
+                flag = os.O_RDWR
+            else:
+                raise ValueError, "mode should be 'r', 'w' or 'a'"
+            self.access = mmap.ACCESS_WRITE
+        if hasattr(os, "O_BINARY"):
+            flag |= os.O_BINARY
+        self.fd = os.open(filename, flag)
+        size = os.fstat(self.fd).st_size
+        self.mm = mmap.mmap(self.fd, size, access=self.access)
+        self.pos = 0
+
+    def __del__(self):
+        self.close()
+
+    mm = fd = None
+
+    def close(self):
+        if self.mm is not None:
+            self.mm.close()
+            self.mm = None
+        if self.fd is not None:
+            os.close(self.fd)
+            self.fd = None
+
+    def tell(self):
+        return self.pos
+
+    def seek(self, offset, whence=0):
+        if whence == 0:
+            self.pos = max(0, offset)
+        elif whence == 1:
+            self.pos = max(0, self.pos + offset)
+        elif whence == 2:
+            self.pos = max(0, self.mm.size() + offset)
+        else:
+            raise ValueError, "seek(): whence must be 0, 1 or 2"
+
+    def readall(self):
+        return self.read()
+
+    def read(self, n=-1):
+        if n >= 0:
+            aim = self.pos + n
+        else:
+            aim = self.mm.size() # Actual file size, may be more than mapped
+            n = aim - self.pos
+        data = self.mm[self.pos:aim]
+        if len(data) < n:
+            del data
+            # File grew since opened; remap to get the new data
+            size = os.fstat(self.fd).st_size
+            self.mm = mmap.mmap(self.fd, size, access=self.access)
+            data = self.mm[self.pos:aim]
+        self.pos += len(data)
+        return data
+
+    def __iter__(self):
+        return self
+
+    def readline(self):
+        hit = self.mm.find("\n", self.pos) + 1
+        if hit:
+            data = self.mm[self.pos:hit]
+            self.pos = hit
+            return data
+        # Remap the file just in case
+        size = os.fstat(self.fd).st_size
+        self.mm = mmap.mmap(self.fd, size, access=self.access)
+        hit = self.mm.find("\n", self.pos) + 1
+        if hit:
+            # Got a whole line after remapping
+            data = self.mm[self.pos:hit]
+            self.pos = hit
+            return data
+        # Read whatever we've got -- may be empty
+        data = self.mm[self.pos:self.mm.size()]
+        self.pos += len(data)
+        return data
+
+    def next(self):
+        hit = self.mm.find("\n", self.pos) + 1
+        if hit:
+            data = self.mm[self.pos:hit]
+            self.pos = hit
+            return data
+        # Remap the file just in case
+        size = os.fstat(self.fd).st_size
+        self.mm = mmap.mmap(self.fd, size, access=self.access)
+        hit = self.mm.find("\n", self.pos) + 1
+        if hit:
+            # Got a whole line after remapping
+            data = self.mm[self.pos:hit]
+            self.pos = hit
+            return data
+        # Read whatever we've got -- may be empty
+        data = self.mm[self.pos:self.mm.size()]
+        if not data:
+            raise StopIteration
+        self.pos += len(data)
+        return data
+
+    def readlines(self, sizehint=0):
+        return list(iter(self.readline, ""))
+
+    def write(self, data):
+        end = self.pos + len(data)
+        try:
+            self.mm[self.pos:end]  = data
+            # This can raise IndexError on Windows, ValueError on Unix
+        except (IndexError, ValueError):
+            # XXX On Unix, this resize() call doesn't work
+            self.mm.resize(end)
+            self.mm[self.pos:end]  = data
+        self.pos = end
+
+    def writelines(self, lines):
+        filter(self.write, lines)
+
+class DiskFile(object):
+
+    """Standard I/O basis stream using os.open/close/read/write/lseek"""
+    modes = {
+        'r'  : os.O_RDONLY,
+        'rb' : os.O_RDONLY,
+        'rU' : os.O_RDONLY,
+        'U'  : os.O_RDONLY,
+        'w'  : os.O_WRONLY,
+        'wb' : os.O_WRONLY,
+        'a'  : os.O_WRONLY | os.O_CREAT | os.O_EXCL,
+        'ab' : os.O_WRONLY | os.O_CREAT | os.O_EXCL,
+        'r+' : os.O_RDWR,
+        'rb+': os.O_RDWR,
+        'r+b': os.O_RDWR,
+        'w+' : os.O_RDWR | os.O_CREAT,
+        'wb+': os.O_RDWR | os.O_CREAT,
+        'w+b': os.O_RDWR | os.O_CREAT,
+        'a+' : os.O_RDWR | os.O_CREAT | os.O_EXCL,
+        'ab+': os.O_RDWR | os.O_CREAT | os.O_EXCL,
+        'a+b': os.O_RDWR | os.O_CREAT | os.O_EXCL,
+        }
+    def __init__(self, filename, mode="r"):
+        self.filename = filename
+        self.mode = mode
+        try:
+            flag = DiskFile.modes[mode]
+        except KeyError:
+            raise ValueError, "mode should be 'r', 'r+', 'w', 'w+' or 'a+'"
+
+        if hasattr(os, "O_BINARY"):
+            flag |= os.O_BINARY
+        try:
+            self.fd = os.open(filename, flag)
+        except OSError:
+            # Opening in mode 'a' or 'a+' and file already exists
+            flag = flag & (os.O_RDWR | os.O_BINARY)
+            self.fd = os.open(filename, flag)
+        if mode[0] == 'a':
+            os.lseek(self.fd, 0, 2) # Move to end of file
+
+    def seek(self, offset, whence=0):
+        os.lseek(self.fd, offset, whence)
+
+    def tell(self):
+        return os.lseek(self.fd, 0, 1)
+
+    def read(self, n):
+        return os.read(self.fd, n)
+
+    def write(self, data):
+        while data:
+            n = os.write(self.fd, data)
+            data = data[n:]
+
+    def close(self):
+        fd = self.fd
+        if fd is not None:
+            self.fd = None
+            os.close(fd)
+
+    def __del__(self):
+        try:
+            self.close()
+        except:
+            pass
+
+class TextInputFilter(object):
+
+    """Filtering input stream for universal newline translation."""
+
+    def __init__(self, base):
+        self.base = base   # must implement read, may implement tell, seek
+        self.atcr = False  # Set when last char read was \r
+        self.buf = ""      # Optional one-character read-ahead buffer
+        self.close = base.close
+        self.CR = False
+        self.NL = False
+        self.CRLF = False
+        
+    def __getattr__(self, name):
+        if name == 'newlines':
+            foundchars = self.CR * 1 + self.NL * 2 + self.CRLF * 4
+            if  not foundchars:
+                return None
+            if foundchars in [1, 2, 4]:
+                if self.CR:
+                    return '\r'
+                elif self.NL:
+                    return '\n'
+                else:
+                    return '\r\n'
+            else:
+                result = []
+                if self.CR:
+                    result.append('\r')
+                if self.NL:
+                    result.append('\n')
+                if self.CRLF:
+                    result.append('\r\n')
+                return tuple(result)
+            
+    def read(self, n):
+        """Read up to n bytes."""
+        if n <= 0:
+            return ""
+        if self.buf:
+            assert not self.atcr
+            data = self.buf
+            self.buf = ""
+            return data
+        data = self.base.read(n)
+
+        # The following whole ugly mess is because we need to keep track of
+        # exactly which line separators we have seen for self.newlines,
+        # grumble, grumble.  This has an interesting corner-case.
+        #
+        # Consider a file consisting of exactly one line ending with '\r'.
+        # The first time you read(), you will not know whether it is a
+        # CR separator or half of a CRLF separator.  Neither will be marked
+        # as seen, since you are waiting for your next read to determine
+        # what you have seen.  But there's no more to read ...
+                        
+        if self.atcr:
+            if data.startswith("\n"):
+                data = data[1:]
+                self.CRLF = True
+                if not data:
+                    data = self.base.read(n)
+            else:
+                self.CR = True
+            self.atcr = False
+            
+        for i in range(len(data)):
+            if data[i] == '\n':
+                if i > 0 and data[i-1] == '\r':
+                    self.CRLF = True
+                else:
+                    self.NL = True
+            elif data[i] == '\r':
+                if i < len(data)-1 and data[i+1] != '\n':
+                    self.CR = True
+                    
+        if "\r" in data:
+            self.atcr = data.endswith("\r")
+            data = data.replace("\r\n", "\n").replace("\r", "\n")
+            
+        return data
+
+    def seek(self, offset, whence=0):
+        """Seeks based on knowledge that does not come from a tell()
+           may go to the wrong place, since the number of
+           characters seen may not match the number of characters
+           that are actually in the file (where \r\n is the
+           line separator). Arithmetics on the result
+           of a tell() that moves beyond a newline character may in the
+           same way give the wrong result.
+        """
+        self.base.seek(offset, whence)
+        self.atcr = False
+        self.buf = ""
+
+    def tell(self):
+        pos = self.base.tell()
+        if self.atcr:
+            # Must read the next byte to see if it's \n,
+            # because then we must report the next position.
+            assert not self.buf 
+            self.buf = self.base.read(1)
+            pos += 1
+            self.atcr = False
+            if self.buf == "\n":
+                self.buf = ""
+        return pos - len(self.buf)
+
+class TextOutputFilter(object):
+
+    """Filtering output stream for universal newline translation."""
+
+    def __init__(self, base, linesep=os.linesep):
+        assert linesep in ["\n", "\r\n", "\r"]
+        self.base = base    # must implement write, may implement seek, tell
+        self.linesep = linesep
+        self.close = base.close
+
+    def write(self, data):
+        if self.linesep is not "\n" and "\n" in data:
+            data = data.replace("\n", self.linesep)
+        self.base.write(data)
+
+    def seek(self, offset, whence=0):
+        self.base.seek(offset, whence)
+
+    def tell(self):
+        return self.base.tell()
+
+class DecodingInputFilter(object):
+
+    """Filtering input stream that decodes an encoded file."""
+
+    def __init__(self, base, encoding="utf8", errors="strict"):
+        self.base = base
+        self.encoding = encoding
+        self.errors = errors
+        self.tell = base.tell
+        self.seek = base.seek
+        self.close = base.close
+
+    def read(self, n):
+        """Read *approximately* n bytes, then decode them.
+
+        Under extreme circumstances,
+        the return length could be longer than n!
+
+        Always return a unicode string.
+
+        This does *not* translate newlines;
+        you can stack TextInputFilter.
+        """
+        data = self.base.read(n)
+        try:
+            return data.decode(self.encoding, self.errors)
+        except ValueError:
+            # XXX Sigh.  decode() doesn't handle incomplete strings well.
+            # Use the retry strategy from codecs.StreamReader.
+            for i in range(9):
+                more = self.base.read(1)
+                if not more:
+                    raise
+                data += more
+                try:
+                    return data.decode(self.encoding, self.errors)
+                except ValueError:
+                    pass
+            raise
+
+class EncodingOutputFilter(object):
+
+    """Filtering output stream that writes to an encoded file."""
+
+    def __init__(self, base, encoding="utf8", errors="strict"):
+        self.base = base
+        self.encoding = encoding
+        self.errors = errors
+        self.tell = base.tell
+        self.seek = base.seek
+        self.close = base.close
+
+    def write(self, chars):
+        if isinstance(chars, str):
+            chars = unicode(chars) # Fail if it's not ASCII
+        self.base.write(chars.encode(self.encoding, self.errors))

Added: pypy/trunk/src/pypy/appspace/test/test_file.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/test/test_file.py	Sat Nov 20 19:52:21 2004
@@ -0,0 +1,20 @@
+import autopath
+from pypy.appspace import _file
+import unittest
+
+class FileTestCase(unittest.TestCase):
+    def setUp(self):
+        self.fd = _file.file_('test_file.py', 'r')
+
+    def tearDown(self):
+        self.fd.close()
+        
+    def test_case_1(self):
+        self.assertEquals(self.fd.tell(), 0)
+
+def test_main():
+    unittest.main()
+
+
+if __name__ == "__main__":
+    test_main()

Added: pypy/trunk/src/pypy/appspace/test/test_sio.py
==============================================================================
--- (empty file)
+++ pypy/trunk/src/pypy/appspace/test/test_sio.py	Sat Nov 20 19:52:21 2004
@@ -0,0 +1,673 @@
+"""Unit tests for sio (new standard I/O)."""
+
+import os
+import time
+import tempfile
+import unittest
+
+import sio
+
+class TestSource(object):
+
+    def __init__(self, packets):
+        for x in packets:
+            assert x
+        self.orig_packets = list(packets)
+        self.packets = list(packets)
+        self.pos = 0
+
+    def tell(self):
+        return self.pos
+
+    def seek(self, offset, whence=0):
+        if whence == 1:
+            offset += self.pos
+        elif whence == 2:
+            for packet in self.orig_packets:
+                offset += len(packet)
+        else:
+            assert whence == 0
+        self.packets = list(self.orig_packets)
+        self.pos = 0
+        while self.pos < offset:
+            data = self.read(offset - self.pos)
+            if not data:
+                break
+        assert self.pos == offset
+
+    def read(self, n):
+        try:
+            data = self.packets.pop(0)
+        except IndexError:
+            return ""
+        if len(data) > n:
+            data, rest = data[:n], data[n:]
+            self.packets.insert(0, rest)
+        self.pos += len(data)
+        return data
+
+    def close(self):
+        pass
+
+class TestReader(object):
+
+    def __init__(self, packets):
+        for x in packets:
+            assert x
+        self.orig_packets = list(packets)
+        self.packets = list(packets)
+        self.pos = 0
+
+    def tell(self):
+        return self.pos
+
+    def seek(self, offset, whence=0):
+        if whence == 1:
+            offset += self.pos
+        elif whence == 2:
+            for packet in self.orig_packets:
+                offset += len(packet)
+        else:
+            assert whence == 0
+        self.packets = list(self.orig_packets)
+        self.pos = 0
+        while self.pos < offset:
+            data = self.read(offset - self.pos)
+            if not data:
+                break
+        assert self.pos == offset
+
+    def read(self, n):
+        try:
+            data = self.packets.pop(0)
+        except IndexError:
+            return ""
+        if len(data) > n:
+            data, rest = data[:n], data[n:]
+            self.packets.insert(0, rest)
+        self.pos += len(data)
+        return data
+
+    def close(self):
+        pass
+
+class TestWriter(object):
+
+    def __init__(self):
+        self.buf = ""
+        self.pos = 0
+
+    def write(self, data):
+        if self.pos >= len(self.buf):
+            self.buf += "\0" * (self.pos - len(self.buf)) + data
+            self.pos = len(self.buf)
+        else:
+            self.buf = (self.buf[:self.pos] + data +
+                        self.buf[self.pos + len(data):])
+            self.pos += len(data)
+
+    def tell(self):
+        return self.pos
+
+    def seek(self, offset, whence=0):
+        if whence == 0:
+            pass
+        elif whence == 1:
+            offset += self.pos
+        elif whence == 2:
+            offset += len(self.buf)
+        else:
+            raise ValueError, "whence should be 0, 1 or 2"
+        if offset < 0:
+            offset = 0
+        self.pos = offset
+
+    def close(self):
+        pass
+    
+class BufferingInputStreamTests(unittest.TestCase):
+
+    packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
+    lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
+
+    def makeStream(self, tell=False, seek=False, bufsize=None):
+        base = TestSource(self.packets)
+        if not tell:
+            base.tell = None
+        if not seek:
+            base.seek = None
+        return sio.BufferingInputStream(base, bufsize)
+
+    def test_readline(self):
+        file = self.makeStream()
+        self.assertEqual(list(iter(file.readline, "")), self.lines)
+
+    def test_readlines(self):
+        # This also tests next() and __iter__()
+        file = self.makeStream()
+        self.assertEqual(file.readlines(), self.lines)
+
+    def test_readlines_small_bufsize(self):
+        file = self.makeStream(bufsize=1)
+        self.assertEqual(list(file), self.lines)
+
+    def test_readall(self):
+        file = self.makeStream()
+        self.assertEqual(file.readall(), "".join(self.lines))
+
+    def test_readall_small_bufsize(self):
+        file = self.makeStream(bufsize=1)
+        self.assertEqual(file.readall(), "".join(self.lines))
+
+    def test_readall_after_readline(self):
+        file = self.makeStream()
+        self.assertEqual(file.readline(), self.lines[0])
+        self.assertEqual(file.readline(), self.lines[1])
+        self.assertEqual(file.readall(), "".join(self.lines[2:]))
+
+    def test_read_1_after_readline(self):
+        file = self.makeStream()
+        self.assertEqual(file.readline(), "ab\n")
+        self.assertEqual(file.readline(), "def\n")
+        blocks = []
+        while 1:
+            block = file.read(1)
+            if not block:
+                break
+            blocks.append(block)
+            self.assertEqual(file.read(0), "")
+        self.assertEqual(blocks, list("".join(self.lines)[7:]))
+
+    def test_read_1(self):
+        file = self.makeStream()
+        blocks = []
+        while 1:
+            block = file.read(1)
+            if not block:
+                break
+            blocks.append(block)
+            self.assertEqual(file.read(0), "")
+        self.assertEqual(blocks, list("".join(self.lines)))
+
+    def test_read_2(self):
+        file = self.makeStream()
+        blocks = []
+        while 1:
+            block = file.read(2)
+            if not block:
+                break
+            blocks.append(block)
+            self.assertEqual(file.read(0), "")
+        self.assertEqual(blocks, ["ab", "\nd", "ef", "\nx", "y\n", "pq",
+                                  "\nu", "vw", "x"])
+
+    def test_read_4(self):
+        file = self.makeStream()
+        blocks = []
+        while 1:
+            block = file.read(4)
+            if not block:
+                break
+            blocks.append(block)
+            self.assertEqual(file.read(0), "")
+        self.assertEqual(blocks, ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"])
+        
+    def test_read_4_after_readline(self):
+        file = self.makeStream()
+        self.assertEqual(file.readline(), "ab\n")
+        self.assertEqual(file.readline(), "def\n")
+        blocks = [file.read(4)]
+        while 1:
+            block = file.read(4)
+            if not block:
+                break
+            blocks.append(block)
+            self.assertEqual(file.read(0), "")
+        self.assertEqual(blocks, ["xy\np", "q\nuv", "wx"])
+
+    def test_read_4_small_bufsize(self):
+        file = self.makeStream(bufsize=1)
+        blocks = []
+        while 1:
+            block = file.read(4)
+            if not block:
+                break
+            blocks.append(block)
+        self.assertEqual(blocks, ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"])
+
+    def test_tell_1(self):
+        file = self.makeStream(tell=True)
+        pos = 0
+        while 1:
+            self.assertEqual(file.tell(), pos)
+            n = len(file.read(1))
+            if not n:
+                break
+            pos += n
+
+    def test_tell_1_after_readline(self):
+        file = self.makeStream(tell=True)
+        pos = 0
+        pos += len(file.readline())
+        self.assertEqual(file.tell(), pos)
+        pos += len(file.readline())
+        self.assertEqual(file.tell(), pos)
+        while 1:
+            self.assertEqual(file.tell(), pos)
+            n = len(file.read(1))
+            if not n:
+                break
+            pos += n
+
+    def test_tell_2(self):
+        file = self.makeStream(tell=True)
+        pos = 0
+        while 1:
+            self.assertEqual(file.tell(), pos)
+            n = len(file.read(2))
+            if not n:
+                break
+            pos += n
+
+    def test_tell_4(self):
+        file = self.makeStream(tell=True)
+        pos = 0
+        while 1:
+            self.assertEqual(file.tell(), pos)
+            n = len(file.read(4))
+            if not n:
+                break
+            pos += n
+
+    def test_tell_readline(self):
+        file = self.makeStream(tell=True)
+        pos = 0
+        while 1:
+            self.assertEqual(file.tell(), pos)
+            n = len(file.readline())
+            if not n:
+                break
+            pos += n
+
+    def test_seek(self):
+        file = self.makeStream(tell=True, seek=True)
+        all = file.readall()
+        end = len(all)
+        for readto in range(0, end+1):
+            for seekto in range(0, end+1):
+                for whence in 0, 1, 2:
+                    file.seek(0)
+                    self.assertEqual(file.tell(), 0)
+                    head = file.read(readto)
+                    self.assertEqual(head, all[:readto])
+                    if whence == 1:
+                        offset = seekto - readto
+                    elif whence == 2:
+                        offset = seekto - end
+                    else:
+                        offset = seekto
+                    file.seek(offset, whence)
+                    here = file.tell()
+                    self.assertEqual(here, seekto)
+                    rest = file.readall()
+                    self.assertEqual(rest, all[seekto:])
+
+    def test_seek_noseek(self):
+        file = self.makeStream()
+        all = file.readall()
+        end = len(all)
+        for readto in range(0, end+1):
+            for seekto in range(readto, end+1):
+                for whence in 1, 2:
+                    file = self.makeStream()
+                    head = file.read(readto)
+                    self.assertEqual(head, all[:readto])
+                    if whence == 1:
+                        offset = seekto - readto
+                    elif whence == 2:
+                        offset = seekto - end
+                    file.seek(offset, whence)
+                    rest = file.readall()
+                    self.assertEqual(rest, all[seekto:])
+
+class BufferingOutputStreamTests(unittest.TestCase):
+
+    def test_write(self):
+        base = TestWriter()
+        filter = sio.BufferingOutputStream(base, 4)
+        filter.write("123")
+        self.assertEqual(base.buf, "")
+        self.assertEquals(filter.tell(), 3)
+        filter.write("456")
+        self.assertEqual(base.buf, "1234")
+        filter.write("789ABCDEF")
+        self.assertEqual(base.buf, "123456789ABC")
+        filter.write("0123")
+        self.assertEqual(base.buf, "123456789ABCDEF0")
+        self.assertEquals(filter.tell(), 19)
+        filter.close()
+        self.assertEqual(base.buf, "123456789ABCDEF0123")
+
+    def test_write_seek(self):
+        base = TestWriter()
+        filter = sio.BufferingOutputStream(base, 4)
+        filter.write("x"*6)
+        filter.seek(3)
+        filter.write("y"*2)
+        filter.close()
+        self.assertEqual(base.buf, "x"*3 + "y"*2 + "x"*1)
+
+class LineBufferingOutputStreamTests(unittest.TestCase):
+
+    def test_write(self):
+        base = TestWriter()
+        filter = sio.LineBufferingOutputStream(base)
+        filter.bufsize = 4 # More handy for testing than the default
+        filter.write("123")
+        self.assertEqual(base.buf, "")
+        self.assertEquals(filter.tell(), 3)
+        filter.write("456")
+        self.assertEqual(base.buf, "1234")
+        filter.write("789ABCDEF\n")
+        self.assertEqual(base.buf, "123456789ABCDEF\n")
+        filter.write("0123")
+        self.assertEqual(base.buf, "123456789ABCDEF\n0123")
+        self.assertEquals(filter.tell(), 20)
+        filter.close()
+        self.assertEqual(base.buf, "123456789ABCDEF\n0123")
+
+    def xtest_write_seek(self):
+        base = TestWriter()
+        filter = sio.BufferingOutputStream(base, 4)
+        filter.write("x"*6)
+        filter.seek(3)
+        filter.write("y"*2)
+        filter.close()
+        self.assertEqual(base.buf, "x"*3 + "y"*2 + "x"*1)
+
+class CRLFFilterTests(unittest.TestCase):
+
+    def test_filter(self):
+        packets = ["abc\ndef\rghi\r\nxyz\r", "123\r", "\n456"]
+        expected = ["abc\ndef\nghi\nxyz\n", "123\n", "456"]
+        crlf = sio.CRLFFilter(TestSource(packets))
+        blocks = []
+        while 1:
+            block = crlf.read(100)
+            if not block:
+                break
+            blocks.append(block)
+        self.assertEqual(blocks, expected)
+
+class MMapFileTests(BufferingInputStreamTests):
+
+    tfn = None
+
+    def tearDown(self):
+        tfn = self.tfn
+        if tfn:
+            self.tfn = None
+            try:
+                os.remove(tfn)
+            except os.error, msg:
+                print "can't remove %s: %s" % (tfn, msg)
+
+    def makeStream(self, tell=None, seek=None, bufsize=None, mode="r"):
+        self.tfn = tempfile.mktemp()
+        f = open(self.tfn, "wb")
+        f.writelines(self.packets)
+        f.close()
+        return sio.MMapFile(self.tfn, mode)
+
+    def test_write(self):
+        if os.name == "posix":
+            return # write() does't work on Unix :-(
+        file = self.makeStream(mode="w")
+        file.write("BooHoo\n")
+        file.write("Barf\n")
+        file.writelines(["a\n", "b\n", "c\n"])
+        self.assertEqual(file.tell(), len("BooHoo\nBarf\na\nb\nc\n"))
+        file.seek(0)
+        self.assertEqual(file.read(), "BooHoo\nBarf\na\nb\nc\n")
+        file.seek(0)
+        self.assertEqual(file.readlines(),
+                         ["BooHoo\n", "Barf\n", "a\n", "b\n", "c\n"])
+        self.assertEqual(file.tell(), len("BooHoo\nBarf\na\nb\nc\n"))
+
+class TextInputFilterTests(unittest.TestCase):
+
+    packets = [
+        "foo\r",
+        "bar\r",
+        "\nfoo\r\n",
+        "abc\ndef\rghi\r\nxyz",
+        "\nuvw\npqr\r",
+        "\n",
+        "abc\n",
+        ]
+    expected = [
+        ("foo\n", 4),
+        ("bar\n", 9),
+        ("foo\n", 14),
+        ("abc\ndef\nghi\nxyz", 30),
+        ("\nuvw\npqr\n", 40),
+        ("abc\n", 44),
+        ("", 44),
+        ("", 44),
+        ]
+
+    expected_with_tell = [
+        ("foo\n", 4),
+        ("b", 5),
+        ("ar\n", 9),
+        ("foo\n", 14),
+        ("abc\ndef\nghi\nxyz", 30),
+        ("\nuvw\npqr\n", 40),
+        ("abc\n", 44),
+        ("", 44),
+        ("", 44),
+        ]
+
+    expected_newlines = [
+        (["abcd"], [None]),
+        (["abcd\n"], ["\n"]),
+        (["abcd\r\n"],["\r\n"]),
+        (["abcd\r"],[None]), # wrong, but requires precognition to fix
+        (["abcd\r", "\nefgh"], [None, "\r\n"]),
+        (["abcd", "\nefg\r", "hij", "k\r\n"], [None, "\n", ("\r", "\n"),
+                                               ("\r", "\n", "\r\n")]),
+        (["abcd", "\refg\r", "\nhij", "k\n"], [None, "\r", ("\r", "\r\n"),
+                                               ("\r", "\n", "\r\n")])
+        ]
+
+    def test_read(self):
+        base = TestReader(self.packets)
+        filter = sio.TextInputFilter(base)
+        for data, pos in self.expected:
+            self.assertEqual(filter.read(100), data)
+
+    def test_read_tell(self):
+        base = TestReader(self.packets)
+        filter = sio.TextInputFilter(base)
+        for data, pos in self.expected_with_tell:
+            self.assertEqual(filter.read(100), data)
+            self.assertEqual(filter.tell(), pos)
+            self.assertEqual(filter.tell(), pos) # Repeat the tell() !
+
+    def test_seek(self):
+        base = TestReader(self.packets)
+        filter = sio.TextInputFilter(base)
+        sofar = ""
+        pairs = []
+        while True:
+            pairs.append((sofar, filter.tell()))
+            c = filter.read(1)
+            if not c:
+                break
+            self.assertEqual(len(c), 1)
+            sofar += c
+        all = sofar
+        for i in range(len(pairs)):
+            sofar, pos = pairs[i]
+            filter.seek(pos)
+            self.assertEqual(filter.tell(), pos)
+            self.assertEqual(filter.tell(), pos)
+            bufs = [sofar]
+            while True:
+                data = filter.read(100)
+                if not data:
+                    self.assertEqual(filter.read(100), "")
+                    break
+                bufs.append(data)
+            self.assertEqual("".join(bufs), all)
+            
+    def test_newlines_attribute(self):
+
+        for packets, expected in self.expected_newlines:
+            base = TestReader(packets)
+            filter = sio.TextInputFilter(base)
+            for e in expected:
+                filter.read(100)
+                self.assertEquals(filter.newlines, e)
+
+class TextOutputFilterTests(unittest.TestCase):
+
+    def test_write_nl(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\n")
+        filter.write("abc")
+        filter.write("def\npqr\nuvw")
+        filter.write("\n123\n")
+        self.assertEqual(base.buf, "abcdef\npqr\nuvw\n123\n")
+
+    def test_write_cr(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\r")
+        filter.write("abc")
+        filter.write("def\npqr\nuvw")
+        filter.write("\n123\n")
+        self.assertEqual(base.buf, "abcdef\rpqr\ruvw\r123\r")
+
+    def test_write_crnl(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\r\n")
+        filter.write("abc")
+        filter.write("def\npqr\nuvw")
+        filter.write("\n123\n")
+        self.assertEqual(base.buf, "abcdef\r\npqr\r\nuvw\r\n123\r\n")
+
+    def test_write_tell_nl(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\n")
+        filter.write("xxx")
+        self.assertEqual(filter.tell(), 3)
+        filter.write("\nabc\n")
+        self.assertEqual(filter.tell(), 8)
+
+    def test_write_tell_cr(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\r")
+        filter.write("xxx")
+        self.assertEqual(filter.tell(), 3)
+        filter.write("\nabc\n")
+        self.assertEqual(filter.tell(), 8)
+
+    def test_write_tell_crnl(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\r\n")
+        filter.write("xxx")
+        self.assertEqual(filter.tell(), 3)
+        filter.write("\nabc\n")
+        self.assertEqual(filter.tell(), 10)
+
+    def test_write_seek(self):
+        base = TestWriter()
+        filter = sio.TextOutputFilter(base, linesep="\n")
+        filter.write("x"*100)
+        filter.seek(50)
+        filter.write("y"*10)
+        self.assertEqual(base.buf, "x"*50 + "y"*10 + "x"*40)
+
+class DecodingInputFilterTests(unittest.TestCase):
+
+    def test_read(self):
+        chars = u"abc\xff\u1234\u4321\x80xyz"
+        data = chars.encode("utf8")
+        base = TestReader([data])
+        filter = sio.DecodingInputFilter(base)
+        bufs = []
+        for n in range(1, 11):
+            while 1:
+                c = filter.read(n)
+                self.assertEqual(type(c), unicode)
+                if not c:
+                    break
+                bufs.append(c)
+            self.assertEqual(u"".join(bufs), chars)
+
+class EncodingOutputFilterTests(unittest.TestCase):
+
+    def test_write(self):
+        chars = u"abc\xff\u1234\u4321\x80xyz"
+        data = chars.encode("utf8")
+        for n in range(1, 11):
+            base = TestWriter()
+            filter = sio.EncodingOutputFilter(base)
+            pos = 0
+            while 1:
+                c = chars[pos:pos+n]
+                if not c:
+                    break
+                pos += len(c)
+                filter.write(c)
+            self.assertEqual(base.buf, data)
+
+# Speed test
+
+FN = "BIG"
+
+def timeit(fn=FN, opener=sio.MMapFile):
+    f = opener(fn, "r")
+    lines = bytes = 0
+    t0 = time.clock()
+    for line in f:
+        lines += 1
+        bytes += len(line)
+    t1 = time.clock()
+    print "%d lines (%d bytes) in %.3f seconds for %s" % (
+        lines, bytes, t1-t0, opener.__name__)
+
+def speed_main():
+    def diskopen(fn, mode):
+        base = sio.DiskFile(fn, mode)
+        return sio.BufferingInputStream(base)
+    timeit(opener=diskopen)
+    timeit(opener=sio.MMapFile)
+    timeit(opener=open)
+
+# Functional test
+
+def functional_main():
+    f = sio.DiskFile("sio.py")
+    f = sio.DecodingInputFilter(f)
+    f = sio.TextInputFilter(f)
+    f = sio.BufferingInputStream(f)
+    for i in range(10):
+        print repr(f.readline())
+
+def makeSuite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(BufferingInputStreamTests))
+    suite.addTest(unittest.makeSuite(BufferingOutputStreamTests))
+    suite.addTest(unittest.makeSuite(LineBufferingOutputStreamTests))
+    suite.addTest(unittest.makeSuite(CRLFFilterTests))
+    suite.addTest(unittest.makeSuite(MMapFileTests))
+    suite.addTest(unittest.makeSuite(TextInputFilterTests))
+    suite.addTest(unittest.makeSuite(TextOutputFilterTests))
+    suite.addTest(unittest.makeSuite(DecodingInputFilterTests))
+    suite.addTest(unittest.makeSuite(EncodingOutputFilterTests))
+
+    return suite
+
+if __name__ == "__main__":
+    unittest.TextTestRunner().run(makeSuite())



More information about the Pypy-commit mailing list