[pypy-commit] pypy streamio-bufinput: Revamped BufferingInputStream to be faster.
justinpeel
noreply at buildbot.pypy.org
Thu Jul 28 21:55:34 CEST 2011
Author: Justin Peel <notmuchtotell at gmail.com>
Branch: streamio-bufinput
Changeset: r46066:4d206523194f
Date: 2011-07-28 13:55 -0600
http://bitbucket.org/pypy/pypy/changeset/4d206523194f/
Log: Revamped BufferingInputStream to be faster.
diff --git a/pypy/rlib/streamio.py b/pypy/rlib/streamio.py
--- a/pypy/rlib/streamio.py
+++ b/pypy/rlib/streamio.py
@@ -496,29 +496,24 @@
if bufsize == -1: # Get default from the class
bufsize = self.bufsize
self.bufsize = bufsize # buffer size (hint only)
- self.lines = [] # ready-made lines (sans "\n")
- self.buf = "" # raw data (may contain "\n")
- # Invariant: readahead == "\n".join(self.lines + [self.buf])
- # self.lines contains no "\n"
- # self.buf may contain "\n"
+ self.buf = "" # raw data
+ self.pos = 0
def flush_buffers(self):
- if self.lines or self.buf:
+ if self.buf:
try:
self.do_seek(self.tell(), 0)
except MyNotImplementedError:
pass
else:
- self.lines = []
self.buf = ""
+ self.pos = 0
def tell(self):
- bytes = self.do_tell() # This may fail
- offset = len(self.buf)
- for line in self.lines:
- offset += len(line) + 1
- assert bytes >= offset #, (locals(), self.__dict__)
- return bytes - offset
+ tellpos = self.do_tell() # This may fail
+ offset = len(self.buf) - self.pos
+ assert tellpos >= offset #, (locals(), self.__dict__)
+ return tellpos - offset
def seek(self, offset, whence):
# This may fail on the do_seek() or do_tell() call.
@@ -526,32 +521,25 @@
# Nor on a seek to the very end.
if whence == 0:
self.do_seek(offset, 0)
- self.lines = []
self.buf = ""
+ self.pos = 0
return
if whence == 1:
+ currentsize = len(self.buf) - self.pos
if offset < 0:
- self.do_seek(self.tell() + offset, 0)
- self.lines = []
- self.buf = ""
+ if self.pos + offset >= 0:
+ self.pos += offset
+ else:
+ self.do_seek(self.tell() + offset, 0)
+ self.pos = 0
+ self.buf = ""
return
- while self.lines:
- line = self.lines[-1]
- if offset <= len(line):
- intoffset = intmask(offset)
- assert intoffset >= 0
- self.lines[-1] = line[intoffset:]
- return
- offset -= len(self.lines[-1]) - 1
- self.lines.pop()
- assert not self.lines
- if offset <= len(self.buf):
- intoffset = intmask(offset)
- assert intoffset >= 0
- self.buf = self.buf[intoffset:]
+ elif offset <= currentsize:
+ self.pos += offset
return
- offset -= len(self.buf)
self.buf = ""
+ self.pos = 0
+ offset -= currentsize
try:
self.do_seek(offset, 1)
except MyNotImplementedError:
@@ -559,187 +547,135 @@
self.read(intoffset)
return
if whence == 2:
- try:
- self.do_seek(offset, 2)
- except MyNotImplementedError:
- pass
- else:
- self.lines = []
- self.buf = ""
- return
+ self.do_seek(offset, 2)
+ self.pos = 0
+ self.buf = ""
+ # We'll comment all of this for now unless someone really wants
+ # something like it
+ #try:
+ # self.do_seek(offset, 2)
+ #except MyNotImplementedError:
+ # pass
+ #else:
+ # self.pos = 0
+ # self.buf = ""
+ # return
# Skip relative to EOF by reading and saving only just as
# much as needed
- intoffset = offset2int(offset)
- self.lines.reverse()
- data = "\n".join(self.lines + [self.buf])
- total = len(data)
- buffers = [data]
- self.lines = []
- self.buf = ""
- while 1:
- data = self.do_read(self.bufsize)
- if not data:
- break
- buffers.append(data)
- total += len(data)
- while buffers and total >= len(buffers[0]) - intoffset:
- total -= len(buffers[0])
- del buffers[0]
- cutoff = total + intoffset
- if cutoff < 0:
- raise StreamError("cannot seek back")
- if buffers:
- buffers[0] = buffers[0][cutoff:]
- self.buf = "".join(buffers)
- self.lines = []
- return
+ #intoffset = offset2int(offset)
+ #self.lines.reverse()
+ #data = "\n".join(self.lines + [self.buf])
+ #total = len(data)
+ #buffers = [data]
+ #self.lines = []
+ #self.buf = ""
+ #while 1:
+ #data = self.do_read(self.bufsize)
+ #if not data:
+ #break
+ #buffers.append(data)
+ #total += len(data)
+ #while buffers and total >= len(buffers[0]) - intoffset:
+ #total -= len(buffers[0])
+ #del buffers[0]
+ #cutoff = total + intoffset
+ #if cutoff < 0:
+ #raise StreamError("cannot seek back")
+ #if buffers:
+ #buffers[0] = buffers[0][cutoff:]
+ #self.buf = "".join(buffers)
+ #self.lines = []
+ #return
raise StreamError("whence should be 0, 1 or 2")
def readall(self):
- self.lines.reverse()
- self.lines.append(self.buf)
- more = ["\n".join(self.lines)]
- self.lines = []
+ pos = self.pos
+ assert pos >= 0
+ chunks = [self.buf[pos:]]
self.buf = ""
+ self.pos = 0
bufsize = self.bufsize
while 1:
data = self.do_read(bufsize)
if not data:
break
- more.append(data)
+ chunks.append(data)
bufsize = min(bufsize*2, self.bigsize)
- return "".join(more)
+ return "".join(chunks)
- def read(self, n):
+ def read(self, n=-1):
assert isinstance(n, int)
- assert n >= 0
- if self.lines:
- # See if this can be satisfied from self.lines[0]
- line = self.lines[-1]
- if len(line) >= n:
- self.lines[-1] = line[n:]
- return line[:n]
-
- # See if this can be satisfied *without exhausting* self.lines
- k = 0
- i = 0
- lgt = len(self.lines)
- for linenum in range(lgt-1,-1,-1):
- line = self.lines[linenum]
- k += len(line)
- if k >= n:
- lines = self.lines[linenum + 1:]
- data = self.lines[linenum]
- cutoff = len(data) - (k-n)
- assert cutoff >= 0
- lines.reverse()
- lines.append(data[:cutoff])
- del self.lines[linenum:]
- self.lines.append(data[cutoff:])
- return "\n".join(lines)
- k += 1
-
- # See if this can be satisfied from self.lines plus self.buf
- if k + len(self.buf) >= n:
- lines = self.lines
- lines.reverse()
- self.lines = []
- cutoff = n - k
- assert cutoff >= 0
- lines.append(self.buf[:cutoff])
- self.buf = self.buf[cutoff:]
- return "\n".join(lines)
-
+ if n < 0:
+ return self.readall()
+ currentsize = len(self.buf) - self.pos
+ start = self.pos
+ assert start >= 0
+ if n <= currentsize:
+ stop = start + n
+ assert stop >= 0
+ result = self.buf[start:stop]
+ self.pos += n
+ return result
else:
- # See if this can be satisfied from self.buf
- data = self.buf
- k = len(data)
- if k >= n:
- cutoff = len(data) - (k-n)
- assert cutoff >= 0
- assert len(data) >= cutoff
- self.buf = data[cutoff:]
- return data[:cutoff]
-
- lines = self.lines
- lines.reverse()
- self.lines = []
- lines.append(self.buf)
- self.buf = ""
- data = "\n".join(lines)
- more = [data]
- k = len(data)
- while k < n:
- data = self.do_read(max(self.bufsize, n-k))
- k += len(data)
- more.append(data)
- if not data:
- break
- cutoff = len(data) - (k-n)
- assert cutoff >= 0
- if len(data) <= cutoff:
- self.buf = ""
- else:
- self.buf = data[cutoff:]
- more[-1] = data[:cutoff]
- return "".join(more)
-
- # read_next_bunch is generally this, version below is slightly faster
- #def _read_next_bunch(self):
- # self.lines = self.buf.split("\n")
- # self.buf = self.lines.pop()
- # self.lines.reverse()
-
- def _read_next_bunch(self):
- numlines = self.buf.count("\n")
- self.lines = [None] * numlines
- last = -1
- num = numlines - 1
- while True:
- start = last + 1
- assert start >= 0
- next = self.buf.find("\n", start)
- if next == -1:
- if last != -1:
- self.buf = self.buf[start:]
- break
- assert next >= 0
- self.lines[num] = self.buf[start:next]
- last = next
- num -= 1
+ chunks = [self.buf[start:]]
+ while 1:
+ self.buf = self.do_read(self.bufsize)
+ if not self.buf:
+ self.pos = 0
+ break
+ currentsize += len(self.buf)
+ if currentsize >= n:
+ self.pos = len(self.buf) - (currentsize - n)
+ stop = self.pos
+ assert stop >= 0
+ chunks.append(self.buf[:stop])
+ break
+ chunks.append(self.buf)
+ return ''.join(chunks)
def readline(self):
- if self.lines:
- return self.lines.pop() + "\n"
-
- # This block is needed because read() can leave self.buf
- # containing newlines
- self._read_next_bunch()
- if self.lines:
- return self.lines.pop() + "\n"
-
- if self.buf:
- buf = [self.buf]
- else:
- buf = []
+ pos = self.pos
+ assert pos >= 0
+ i = self.buf.find("\n", pos)
+ start = self.pos
+ assert start >= 0
+ if i >= 0: # new line found
+ i += 1
+ result = self.buf[start:i]
+ self.pos = i
+ return result
+ temp = self.buf[start:]
+ # read one buffer and most of the time a new line will be found
+ self.buf = self.do_read(self.bufsize)
+ i = self.buf.find("\n")
+ if i >= 0: # new line found
+ i += 1
+ result = temp + self.buf[:i]
+ self.pos = i
+ return result
+ if not self.buf:
+ self.pos = 0
+ return temp
+ # need to keep getting data until we find a new line
+ chunks = [temp, self.buf]
while 1:
self.buf = self.do_read(self.bufsize)
- self._read_next_bunch()
- if self.lines:
- buf.append(self.lines.pop())
- buf.append("\n")
+ if not self.buf:
+ self.pos = 0
break
- if not self.buf:
+ i = self.buf.find("\n")
+ if i >= 0:
+ i += 1
+ chunks.append(self.buf[:i])
+ self.pos = i
break
- buf.append(self.buf)
-
- return "".join(buf)
+ chunks.append(self.buf)
+ return "".join(chunks)
def peek(self):
- if self.lines:
- return self.lines[-1] + "\n"
- else:
- return self.buf
+ pos = self.pos
+ assert pos >= 0
+ return self.buf[pos:]
write = PassThrough("write", flush_buffers=True)
truncate = PassThrough("truncate", flush_buffers=True)
More information about the pypy-commit
mailing list