[Python-checkins] r68065 - sandbox/trunk/io-c/test_io.py

antoine.pitrou python-checkins at python.org
Tue Dec 30 16:21:48 CET 2008


Author: antoine.pitrou
Date: Tue Dec 30 16:21:46 2008
New Revision: 68065

Log:
extend and improve tests



Modified:
   sandbox/trunk/io-c/test_io.py

Modified: sandbox/trunk/io-c/test_io.py
==============================================================================
--- sandbox/trunk/io-c/test_io.py	(original)
+++ sandbox/trunk/io-c/test_io.py	Tue Dec 30 16:21:46 2008
@@ -7,7 +7,8 @@
 import threading
 import random
 import unittest
-from itertools import chain, cycle
+from itertools import chain, cycle, count, repeat
+from collections import deque
 from test import support
 
 import codecs
@@ -63,21 +64,37 @@
 
 class MockNonBlockWriterIO(io.RawIOBase):
 
-    def __init__(self, blocking_script):
-        self._blocking_script = list(blocking_script)
+    def __init__(self):
         self._write_stack = []
+        self._blocker_char = None
 
-    def write(self, b):
-        self._write_stack.append(bytes(b))
-        n = self._blocking_script.pop(0)
-        if (n < 0):
-            raise io.BlockingIOError(0, "test blocking", -n)
-        else:
-            return n
-
+    def pop_written(self):
+        s = b"".join(self._write_stack)
+        self._write_stack[:] = []
+        return s
+
+    def block_on(self, char):
+        """Block when a given char is encountered."""
+        self._blocker_char = char
+ 
     def writable(self):
         return True
 
+    def write(self, b):
+        b = bytes(b)
+        n = -1
+        if self._blocker_char:
+            try:
+                n = b.index(self._blocker_char)
+            except ValueError:
+                pass
+            else:
+                self._blocker_char = None
+                self._write_stack.append(b[:n])
+                raise io.BlockingIOError(0, "test blocking", n)
+        self._write_stack.append(b)
+        return len(b)
+
 
 class IOTest(unittest.TestCase):
 
@@ -355,6 +372,25 @@
 
 
 class BufferedReaderTest(unittest.TestCase):
+    
+    def testConstructor(self):
+        rawio = MockRawIO([b"abc"])
+        bufio = io.BufferedReader(rawio)
+        bufio.__init__(rawio)
+        bufio.__init__(rawio, buffer_size=1024)
+        bufio.__init__(rawio, buffer_size=16)
+        self.assertEquals(b"abc", bufio.read())
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+        self.assertRaises(ValueError, bufio.read)
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+        self.assertRaises(ValueError, bufio.read)
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+        self.assertRaises(ValueError, bufio.read)
+        #self.assertRaises((OverflowError, MemoryError, ValueError),
+            #bufio.__init__, rawio, sys.maxsize)
+        rawio = MockRawIO([b"abc"])
+        bufio.__init__(rawio)
+        self.assertEquals(b"abc", bufio.read())
 
     def testRead(self):
         rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -461,43 +497,100 @@
 
 class BufferedWriterTest(unittest.TestCase):
 
+    def testConstructor(self):
+        rawio = MockRawIO()
+        bufio = io.BufferedWriter(rawio)
+        bufio.__init__(rawio)
+        bufio.__init__(rawio, buffer_size=1024)
+        bufio.__init__(rawio, buffer_size=16)
+        self.assertEquals(3, bufio.write(b"abc"))
+        bufio.flush()
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+        self.assertRaises(ValueError, bufio.write, b"def")
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+        self.assertRaises(ValueError, bufio.write, b"def")
+        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+        self.assertRaises(ValueError, bufio.write, b"def")
+        self.assertRaises((OverflowError, MemoryError, ValueError),
+            bufio.__init__, rawio, sys.maxsize)
+        bufio.__init__(rawio)
+        self.assertEquals(3, bufio.write(b"ghi"))
+        bufio.flush()
+        self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
+
     def testWrite(self):
         # Write to the buffered IO but don't overflow the buffer.
         writer = MockRawIO()
         bufio = io.BufferedWriter(writer, 8)
-
         bufio.write(b"abc")
-
         self.assertFalse(writer._write_stack)
 
     def testWriteOverflow(self):
         writer = MockRawIO()
         bufio = io.BufferedWriter(writer, 8)
+        contents = b"abcdefghijklmnop"
+        for n in range(0, len(contents), 3):
+            bufio.write(contents[n:n+3])
+        flushed = b"".join(writer._write_stack)
+        # At least (total - 7) bytes were implicitly flushed, perhaps more
+        # depending on the implementation.
+        self.assert_(flushed.startswith(contents[:-8]), flushed)
+
+    def check_writes(self, intermediate_func):
+        # Lots of writes, test the flushed output is as expected.
+        contents = bytes(range(256)) * 1000
+        n = 0
+        writer = MockRawIO()
+        bufio = io.BufferedWriter(writer, 13)
+        # Generator of write sizes: repeat each N 15 times then proceed to N+1
+        def gen_sizes():
+            for size in count(1):
+                for i in range(15):
+                    yield size
+        sizes = gen_sizes()
+        while n < len(contents):
+            size = min(next(sizes), len(contents) - n)
+            self.assertEquals(bufio.write(contents[n:n+size]), size)
+            intermediate_func(bufio)
+            n += size
+        bufio.flush()
+        self.assertEquals(contents, b"".join(writer._write_stack))
 
-        bufio.write(b"abc")
-        bufio.write(b"defghijkl")
+    def testWrites(self):
+        self.check_writes(lambda bufio: None)
+    
+    def testWritesAndFlushes(self):
+        self.check_writes(lambda bufio: bufio.flush())
 
-        self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
+    def testWritesAndSeeks(self):
+        self.check_writes(lambda bufio: bufio.seek(0, 1))
 
     def testWriteNonBlocking(self):
-        raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
-        bufio = io.BufferedWriter(raw, 8, 16)
+        raw = MockNonBlockWriterIO()
+        bufio = io.BufferedWriter(raw, 8)
 
-        bufio.write(b"asdf")
-        bufio.write(b"asdfa")
-        self.assertEquals(b"asdfasdfa", raw._write_stack[0])
-
-        bufio.write(b"asdfasdfasdf")
-        self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
-        bufio.write(b"asdfasdfasdf")
-        self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
-        self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
-
-        bufio.write(b"asdfasdfasdf")
-
-        # XXX I don't like this test. It relies too heavily on how the
-        # algorithm actually works, which we might change. Refactor
-        # later.
+        self.assertEquals(bufio.write(b"abcd"), 4)
+        self.assertEquals(bufio.write(b"efghi"), 5)
+        # 1 byte will be written, the rest will be buffered
+        raw.block_on(b"k")
+        self.assertEquals(bufio.write(b"jklmnopqr"), 9)
+ 
+        # 4 bytes will be written, 8 will be buffered and the rest will be lost
+        raw.block_on(b"0")
+        try:
+            bufio.write(b"wxyz0123456789")
+        except io.BlockingIOError as e:
+            written = e.characters_written
+        else:
+            self.fail("BlockingIOError should have been raised")
+        self.assertEquals(written, 12)
+        self.assertEquals(raw.pop_written(),
+            b"abcdefghijklmnopqrwxyz")
+ 
+        self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
+        s = raw.pop_written()
+        # Previously buffered bytes were flushed
+        self.assertTrue(s.startswith(b"01234567A"), s)
 
     def testFileno(self):
         rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -508,16 +601,24 @@
     def testFlush(self):
         writer = MockRawIO()
         bufio = io.BufferedWriter(writer, 8)
-
         bufio.write(b"abc")
         bufio.flush()
-
         self.assertEquals(b"abc", writer._write_stack[0])
 
     def testThreads(self):
-        # BufferedWriter should not raise exceptions or crash
-        # when called from multiple threads.
         try:
+            # Write out many bytes from many threads and test they were
+            # all flushed.
+            N = 1000
+            contents = bytes(range(256)) * N
+            sizes = cycle([1, 19])
+            n = 0
+            queue = deque()
+            while n < len(contents):
+                size = next(sizes)
+                queue.append(contents[n:n+size])
+                n += size
+            del contents
             # We use a real file object because it allows us to
             # exercise situations where the GIL is released before
             # writing the buffer to the raw streams. This is in addition
@@ -528,9 +629,11 @@
                 errors = []
                 def f():
                     try:
-                        # Write enough bytes to flush the buffer
-                        s = b"a" * 19
-                        for i in range(50):
+                        while True:
+                            try:
+                                s = queue.popleft()
+                            except IndexError:
+                                return
                             bufio.write(s)
                     except Exception as e:
                         errors.append(e)
@@ -543,6 +646,11 @@
                     t.join()
                 self.assertFalse(errors,
                     "the following exceptions were caught: %r" % errors)
+                bufio.close()
+            with io.open(support.TESTFN, "rb") as f:
+                s = f.read()
+            for i in range(256):
+                self.assertEquals(s.count(bytes([i])), N)
         finally:
             support.unlink(support.TESTFN)
 


More information about the Python-checkins mailing list