[pypy-svn] r34132 - pypy/dist/pypy/rlib/test

cfbolz at codespeak.net cfbolz at codespeak.net
Fri Nov 3 17:53:03 CET 2006


Author: cfbolz
Date: Fri Nov  3 17:52:59 2006
New Revision: 34132

Modified:
   pypy/dist/pypy/rlib/test/test_streamio.py
Log:
(cfbolz, arigo around)
convert more tests, that pass immediately


Modified: pypy/dist/pypy/rlib/test/test_streamio.py
==============================================================================
--- pypy/dist/pypy/rlib/test/test_streamio.py	(original)
+++ pypy/dist/pypy/rlib/test/test_streamio.py	Fri Nov  3 17:52:59 2006
@@ -34,8 +34,7 @@
         self.pos = 0
         while self.pos < offset:
             data = self.read(offset - self.pos)
-            if not data:
-                break
+            assert data
         assert self.pos == offset
 
     def read(self, n):
@@ -72,8 +71,10 @@
             self.buf += "\0" * (self.pos - len(self.buf)) + data
             self.pos = len(self.buf)
         else:
-            self.buf = (self.buf[:self.pos] + data +
-                        self.buf[self.pos + len(data):])
+            start = self.pos
+            assert start >= 0
+            self.buf = (self.buf[:start] + data +
+                        self.buf[start + len(data):])
             self.pos += len(data)
 
     def tell(self):
@@ -109,13 +110,17 @@
 class TReaderWriter(TWriter):
 
     def read(self, n=-1):
+        start = self.pos
+        assert start >= 0
         if n < 1:
-            result = self.buf[self.pos: ]
+            result = self.buf[start: ]
             self.pos = len(self.buf)
         else:
-            if self.pos + n > len(self.buf):
-                n = len(self.buf) - self.pos
-            result = self.buf[self.pos: self.pos+n]
+            if n > len(self.buf) - start:
+                n = len(self.buf) - start
+            stop = start + n
+            assert stop >= 0
+            result = self.buf[start: stop]
             self.pos += n
         return result
     
@@ -136,6 +141,7 @@
             base.tell = f
         if not seek:
             base.seek = f
+
         return streamio.BufferingInputStream(base, bufsize)
 
     def test_readline(self):
@@ -179,10 +185,8 @@
     def test_read_1_after_readline(self):
         file = self.makeStream()
         def f():
-            if not file.readline() == "ab\n":
-                return False
-            if not file.readline() == "def\n":
-                return False
+            assert file.readline() == "ab\n"
+            assert file.readline() == "def\n"
             os.write(1, "3\n")
             blocks = []
             while 1:
@@ -193,8 +197,7 @@
                     break
                 os.write(1, "5\n")
                 blocks.append(block)
-                if not file.read(0) == "":
-                    return False
+                assert file.read(0) == ""
             os.write(1, "6\n")
             return "".join(blocks) == "".join(self.lines)[7:]
         res = self.interpret(f, [])
@@ -209,8 +212,7 @@
                 if not block:
                     break
                 blocks.append(block)
-                if not file.read(0) == "":
-                    return False
+                assert file.read(0) == ""
             return "".join(blocks) == "".join(self.lines)
         res = self.interpret(f, [])
         assert res
@@ -224,8 +226,7 @@
                 if not block:
                     break
                 blocks.append(block)
-                if not file.read(0) == "":
-                    return False
+                assert file.read(0) == ""
             return blocks == ["ab", "\nd", "ef", "\nx", "y\n", "pq",
                               "\nu", "vw", "x"]
         res = self.interpret(f, [])
@@ -240,8 +241,7 @@
                 if not block:
                     break
                 blocks.append(block)
-                if not file.read(0) == "":
-                    return True
+                assert file.read(0) == ""
             return blocks == ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"]
         res = self.interpret(f, [])
         assert res
@@ -251,13 +251,9 @@
         def f():
             os.write(1, "1\n")
             res = file.readline()
-            if not res == "ab\n":
-                os.write(1, "1f\nxxx" + res + "yyy\n" + str(len(res)) + "\n")
-                return False
+            assert res == "ab\n"
             os.write(1, "2\n")
-            if not file.readline() == "def\n":
-                os.write(1, "2f\n")
-                return False
+            assert file.readline() == "def\n"
             os.write(1, "3\n")
             blocks = [file.read(4)]
             while 1:
@@ -266,9 +262,7 @@
                     break
                 blocks.append(block)
                 os.write(1, "4\n")
-                if not file.read(0) == "":
-                    os.write(1, "4f\n")
-                    return False
+                assert file.read(0) == ""
             os.write(1, "5\n")
             for element in blocks:
                 os.write(1, element + "XXX\n")
@@ -294,8 +288,7 @@
         def f():
             pos = 0
             while 1:
-                if not file.tell() == pos:
-                    return False
+                assert file.tell() == pos
                 n = len(file.read(1))
                 if not n:
                     break
@@ -309,14 +302,11 @@
         def f():
             pos = 0
             pos += len(file.readline())
-            if not file.tell() == pos:
-                return False
+            assert file.tell() == pos
             pos += len(file.readline())
-            if not file.tell() == pos:
-                return False
+            assert file.tell() == pos
             while 1:
-                if not file.tell() == pos:
-                    return False
+                assert file.tell() == pos
                 n = len(file.read(1))
                 if not n:
                     break
@@ -330,8 +320,7 @@
         def f():
             pos = 0
             while 1:
-                if not file.tell() == pos:
-                    return False
+                assert file.tell() == pos
                 n = len(file.read(2))
                 if not n:
                     break
@@ -345,8 +334,7 @@
         def f():
             pos = 0
             while 1:
-                if not file.tell() == pos:
-                    return False
+                assert file.tell() == pos
                 n = len(file.read(4))
                 if not n:
                     break
@@ -360,8 +348,7 @@
         def f():
             pos = 0
             while 1:
-                if not file.tell() == pos:
-                    return False
+                assert file.tell() == pos
                 n = len(file.readline())
                 if not n:
                     break
@@ -379,11 +366,9 @@
                 for seekto in range(0, end+1):
                     for whence in [0, 1, 2]:
                         file.seek(0)
-                        if not file.tell() == 0:
-                            return False
+                        assert file.tell() == 0
                         head = file.read(readto)
-                        if not head == all[:readto]:
-                            return False
+                        assert head == all[:readto]
                         if whence == 1:
                             offset = seekto - readto
                         elif whence == 2:
@@ -392,11 +377,9 @@
                             offset = seekto
                         file.seek(offset, whence)
                         here = file.tell()
-                        if not here == seekto:
-                            return False
+                        assert here == seekto
                         rest = file.readall()
-                        if not rest == all[seekto:]:
-                            return False
+                        assert rest == all[seekto:]
             return True
         res = self.interpret(f, [])
         assert res
@@ -412,8 +395,7 @@
                         base = TSource(self.packets)
                         file = streamio.BufferingInputStream(base)
                         head = file.read(readto)
-                        if not head == all[:readto]:
-                            return False
+                        assert head == all[:readto]
                         offset = 42 # for the flow space
                         if whence == 1:
                             offset = seekto - readto
@@ -624,51 +606,55 @@
         assert file.tell() == len("BooHoo\nBarf\na\nb\nc\n")
 
 
-
-
-
-class TestBufferingInputOutputStreamTests:
+class BaseTestBufferingInputOutputStreamTests(BaseRtypingTest):
 
     def test_write(self):
         import sys
         base = TReaderWriter()
         filter = streamio.BufferingInputStream(
                 streamio.BufferingOutputStream(base, 4), 4)
-        filter.write("123456789")
-        for chunk in base.chunks:
-            assert len(chunk[1]) >= 4
-        s = filter.read(sys.maxint)
-        assert base.buf == "123456789"
-        base.chunks = []
-        filter.write("abc")
-        assert not base.chunks
-        s = filter.read(sys.maxint)
-        assert base.buf == "123456789abc"
-        base.chunks = []
-        filter.write("012")
-        assert not base.chunks
-        filter.seek(4, 0)
-        assert base.buf == "123456789abc012"
-        assert filter.read(3) == "567"
-        filter.write('x')
-        filter.flush()
-        assert base.buf == "1234567x9abc012"
+        def f():
+            filter.write("123456789")
+            for chunk in base.chunks:
+                assert len(chunk[1]) >= 4
+            s = filter.read(sys.maxint)
+            assert base.buf == "123456789"
+            base.chunks = []
+            filter.write("abc")
+            assert not base.chunks
+            s = filter.read(sys.maxint)
+            assert base.buf == "123456789abc"
+            base.chunks = []
+            filter.write("012")
+            assert not base.chunks
+            filter.seek(4, 0)
+            assert base.buf == "123456789abc012"
+            assert filter.read(3) == "567"
+            filter.write('x')
+            filter.flush()
+            assert base.buf == "1234567x9abc012"
+        self.interpret(f, [])
 
     def test_write_seek_beyond_end(self):
         "Linux behaviour. May be different on other platforms."
         base = TReaderWriter()
         filter = streamio.BufferingInputStream(
             streamio.BufferingOutputStream(base, 4), 4)
-        filter.seek(3)
-        filter.write("y"*2)
-        filter.close()
-        assert base.buf == "\0"*3 + "y"*2
-
-
-
-
+        def f():
+            filter.seek(3)
+            filter.write("y"*2)
+            filter.close()
+            assert base.buf == "\0"*3 + "y"*2
+        self.interpret(f, [])
 
+class TestBufferingInputOutputStreamTests(
+        BaseTestBufferingInputOutputStreamTests):
+    def interpret(self, func, args):
+        return func(*args)
 
+class TestBufferingInputOutputStreamTestsLLinterp(
+        BaseTestBufferingInputOutputStreamTests, LLRtypeMixin):
+    pass
 
 
 class TestTextInputFilter:



More information about the Pypy-commit mailing list