[pypy-svn] r34132 - pypy/dist/pypy/rlib/test
cfbolz at codespeak.net
cfbolz at codespeak.net
Fri Nov 3 17:53:03 CET 2006
Author: cfbolz
Date: Fri Nov 3 17:52:59 2006
New Revision: 34132
Modified:
pypy/dist/pypy/rlib/test/test_streamio.py
Log:
(cfbolz, arigo around)
convert more tests, that pass immediately
Modified: pypy/dist/pypy/rlib/test/test_streamio.py
==============================================================================
--- pypy/dist/pypy/rlib/test/test_streamio.py (original)
+++ pypy/dist/pypy/rlib/test/test_streamio.py Fri Nov 3 17:52:59 2006
@@ -34,8 +34,7 @@
self.pos = 0
while self.pos < offset:
data = self.read(offset - self.pos)
- if not data:
- break
+ assert data
assert self.pos == offset
def read(self, n):
@@ -72,8 +71,10 @@
self.buf += "\0" * (self.pos - len(self.buf)) + data
self.pos = len(self.buf)
else:
- self.buf = (self.buf[:self.pos] + data +
- self.buf[self.pos + len(data):])
+ start = self.pos
+ assert start >= 0
+ self.buf = (self.buf[:start] + data +
+ self.buf[start + len(data):])
self.pos += len(data)
def tell(self):
@@ -109,13 +110,17 @@
class TReaderWriter(TWriter):
def read(self, n=-1):
+ start = self.pos
+ assert start >= 0
if n < 1:
- result = self.buf[self.pos: ]
+ result = self.buf[start: ]
self.pos = len(self.buf)
else:
- if self.pos + n > len(self.buf):
- n = len(self.buf) - self.pos
- result = self.buf[self.pos: self.pos+n]
+ if n > len(self.buf) - start:
+ n = len(self.buf) - start
+ stop = start + n
+ assert stop >= 0
+ result = self.buf[start: stop]
self.pos += n
return result
@@ -136,6 +141,7 @@
base.tell = f
if not seek:
base.seek = f
+
return streamio.BufferingInputStream(base, bufsize)
def test_readline(self):
@@ -179,10 +185,8 @@
def test_read_1_after_readline(self):
file = self.makeStream()
def f():
- if not file.readline() == "ab\n":
- return False
- if not file.readline() == "def\n":
- return False
+ assert file.readline() == "ab\n"
+ assert file.readline() == "def\n"
os.write(1, "3\n")
blocks = []
while 1:
@@ -193,8 +197,7 @@
break
os.write(1, "5\n")
blocks.append(block)
- if not file.read(0) == "":
- return False
+ assert file.read(0) == ""
os.write(1, "6\n")
return "".join(blocks) == "".join(self.lines)[7:]
res = self.interpret(f, [])
@@ -209,8 +212,7 @@
if not block:
break
blocks.append(block)
- if not file.read(0) == "":
- return False
+ assert file.read(0) == ""
return "".join(blocks) == "".join(self.lines)
res = self.interpret(f, [])
assert res
@@ -224,8 +226,7 @@
if not block:
break
blocks.append(block)
- if not file.read(0) == "":
- return False
+ assert file.read(0) == ""
return blocks == ["ab", "\nd", "ef", "\nx", "y\n", "pq",
"\nu", "vw", "x"]
res = self.interpret(f, [])
@@ -240,8 +241,7 @@
if not block:
break
blocks.append(block)
- if not file.read(0) == "":
- return True
+ assert file.read(0) == ""
return blocks == ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"]
res = self.interpret(f, [])
assert res
@@ -251,13 +251,9 @@
def f():
os.write(1, "1\n")
res = file.readline()
- if not res == "ab\n":
- os.write(1, "1f\nxxx" + res + "yyy\n" + str(len(res)) + "\n")
- return False
+ assert res == "ab\n"
os.write(1, "2\n")
- if not file.readline() == "def\n":
- os.write(1, "2f\n")
- return False
+ assert file.readline() == "def\n"
os.write(1, "3\n")
blocks = [file.read(4)]
while 1:
@@ -266,9 +262,7 @@
break
blocks.append(block)
os.write(1, "4\n")
- if not file.read(0) == "":
- os.write(1, "4f\n")
- return False
+ assert file.read(0) == ""
os.write(1, "5\n")
for element in blocks:
os.write(1, element + "XXX\n")
@@ -294,8 +288,7 @@
def f():
pos = 0
while 1:
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
n = len(file.read(1))
if not n:
break
@@ -309,14 +302,11 @@
def f():
pos = 0
pos += len(file.readline())
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
pos += len(file.readline())
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
while 1:
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
n = len(file.read(1))
if not n:
break
@@ -330,8 +320,7 @@
def f():
pos = 0
while 1:
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
n = len(file.read(2))
if not n:
break
@@ -345,8 +334,7 @@
def f():
pos = 0
while 1:
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
n = len(file.read(4))
if not n:
break
@@ -360,8 +348,7 @@
def f():
pos = 0
while 1:
- if not file.tell() == pos:
- return False
+ assert file.tell() == pos
n = len(file.readline())
if not n:
break
@@ -379,11 +366,9 @@
for seekto in range(0, end+1):
for whence in [0, 1, 2]:
file.seek(0)
- if not file.tell() == 0:
- return False
+ assert file.tell() == 0
head = file.read(readto)
- if not head == all[:readto]:
- return False
+ assert head == all[:readto]
if whence == 1:
offset = seekto - readto
elif whence == 2:
@@ -392,11 +377,9 @@
offset = seekto
file.seek(offset, whence)
here = file.tell()
- if not here == seekto:
- return False
+ assert here == seekto
rest = file.readall()
- if not rest == all[seekto:]:
- return False
+ assert rest == all[seekto:]
return True
res = self.interpret(f, [])
assert res
@@ -412,8 +395,7 @@
base = TSource(self.packets)
file = streamio.BufferingInputStream(base)
head = file.read(readto)
- if not head == all[:readto]:
- return False
+ assert head == all[:readto]
offset = 42 # for the flow space
if whence == 1:
offset = seekto - readto
@@ -624,51 +606,55 @@
assert file.tell() == len("BooHoo\nBarf\na\nb\nc\n")
-
-
-
-class TestBufferingInputOutputStreamTests:
+class BaseTestBufferingInputOutputStreamTests(BaseRtypingTest):
def test_write(self):
import sys
base = TReaderWriter()
filter = streamio.BufferingInputStream(
streamio.BufferingOutputStream(base, 4), 4)
- filter.write("123456789")
- for chunk in base.chunks:
- assert len(chunk[1]) >= 4
- s = filter.read(sys.maxint)
- assert base.buf == "123456789"
- base.chunks = []
- filter.write("abc")
- assert not base.chunks
- s = filter.read(sys.maxint)
- assert base.buf == "123456789abc"
- base.chunks = []
- filter.write("012")
- assert not base.chunks
- filter.seek(4, 0)
- assert base.buf == "123456789abc012"
- assert filter.read(3) == "567"
- filter.write('x')
- filter.flush()
- assert base.buf == "1234567x9abc012"
+ def f():
+ filter.write("123456789")
+ for chunk in base.chunks:
+ assert len(chunk[1]) >= 4
+ s = filter.read(sys.maxint)
+ assert base.buf == "123456789"
+ base.chunks = []
+ filter.write("abc")
+ assert not base.chunks
+ s = filter.read(sys.maxint)
+ assert base.buf == "123456789abc"
+ base.chunks = []
+ filter.write("012")
+ assert not base.chunks
+ filter.seek(4, 0)
+ assert base.buf == "123456789abc012"
+ assert filter.read(3) == "567"
+ filter.write('x')
+ filter.flush()
+ assert base.buf == "1234567x9abc012"
+ self.interpret(f, [])
def test_write_seek_beyond_end(self):
"Linux behaviour. May be different on other platforms."
base = TReaderWriter()
filter = streamio.BufferingInputStream(
streamio.BufferingOutputStream(base, 4), 4)
- filter.seek(3)
- filter.write("y"*2)
- filter.close()
- assert base.buf == "\0"*3 + "y"*2
-
-
-
-
+ def f():
+ filter.seek(3)
+ filter.write("y"*2)
+ filter.close()
+ assert base.buf == "\0"*3 + "y"*2
+ self.interpret(f, [])
+class TestBufferingInputOutputStreamTests(
+ BaseTestBufferingInputOutputStreamTests):
+ def interpret(self, func, args):
+ return func(*args)
+class TestBufferingInputOutputStreamTestsLLinterp(
+ BaseTestBufferingInputOutputStreamTests, LLRtypeMixin):
+ pass
class TestTextInputFilter:
More information about the Pypy-commit
mailing list