[Python-checkins] r68065 - sandbox/trunk/io-c/test_io.py
antoine.pitrou
python-checkins at python.org
Tue Dec 30 16:21:48 CET 2008
Author: antoine.pitrou
Date: Tue Dec 30 16:21:46 2008
New Revision: 68065
Log:
extend and improve tests
Modified:
sandbox/trunk/io-c/test_io.py
Modified: sandbox/trunk/io-c/test_io.py
==============================================================================
--- sandbox/trunk/io-c/test_io.py (original)
+++ sandbox/trunk/io-c/test_io.py Tue Dec 30 16:21:46 2008
@@ -7,7 +7,8 @@
import threading
import random
import unittest
-from itertools import chain, cycle
+from itertools import chain, cycle, count, repeat
+from collections import deque
from test import support
import codecs
@@ -63,21 +64,37 @@
class MockNonBlockWriterIO(io.RawIOBase):
- def __init__(self, blocking_script):
- self._blocking_script = list(blocking_script)
+ def __init__(self):
self._write_stack = []
+ self._blocker_char = None
- def write(self, b):
- self._write_stack.append(bytes(b))
- n = self._blocking_script.pop(0)
- if (n < 0):
- raise io.BlockingIOError(0, "test blocking", -n)
- else:
- return n
-
+ def pop_written(self):
+ s = b"".join(self._write_stack)
+ self._write_stack[:] = []
+ return s
+
+ def block_on(self, char):
+ """Block when a given char is encountered."""
+ self._blocker_char = char
+
def writable(self):
return True
+ def write(self, b):
+ b = bytes(b)
+ n = -1
+ if self._blocker_char:
+ try:
+ n = b.index(self._blocker_char)
+ except ValueError:
+ pass
+ else:
+ self._blocker_char = None
+ self._write_stack.append(b[:n])
+ raise io.BlockingIOError(0, "test blocking", n)
+ self._write_stack.append(b)
+ return len(b)
+
class IOTest(unittest.TestCase):
@@ -355,6 +372,25 @@
class BufferedReaderTest(unittest.TestCase):
+
+ def testConstructor(self):
+ rawio = MockRawIO([b"abc"])
+ bufio = io.BufferedReader(rawio)
+ bufio.__init__(rawio)
+ bufio.__init__(rawio, buffer_size=1024)
+ bufio.__init__(rawio, buffer_size=16)
+ self.assertEquals(b"abc", bufio.read())
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.read)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.read)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ self.assertRaises(ValueError, bufio.read)
+ #self.assertRaises((OverflowError, MemoryError, ValueError),
+ #bufio.__init__, rawio, sys.maxsize)
+ rawio = MockRawIO([b"abc"])
+ bufio.__init__(rawio)
+ self.assertEquals(b"abc", bufio.read())
def testRead(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -461,43 +497,100 @@
class BufferedWriterTest(unittest.TestCase):
+ def testConstructor(self):
+ rawio = MockRawIO()
+ bufio = io.BufferedWriter(rawio)
+ bufio.__init__(rawio)
+ bufio.__init__(rawio, buffer_size=1024)
+ bufio.__init__(rawio, buffer_size=16)
+ self.assertEquals(3, bufio.write(b"abc"))
+ bufio.flush()
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises((OverflowError, MemoryError, ValueError),
+ bufio.__init__, rawio, sys.maxsize)
+ bufio.__init__(rawio)
+ self.assertEquals(3, bufio.write(b"ghi"))
+ bufio.flush()
+ self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
+
def testWrite(self):
# Write to the buffered IO but don't overflow the buffer.
writer = MockRawIO()
bufio = io.BufferedWriter(writer, 8)
-
bufio.write(b"abc")
-
self.assertFalse(writer._write_stack)
def testWriteOverflow(self):
writer = MockRawIO()
bufio = io.BufferedWriter(writer, 8)
+ contents = b"abcdefghijklmnop"
+ for n in range(0, len(contents), 3):
+ bufio.write(contents[n:n+3])
+ flushed = b"".join(writer._write_stack)
+ # At least (total - 7) bytes were implicitly flushed, perhaps more
+ # depending on the implementation.
+ self.assert_(flushed.startswith(contents[:-8]), flushed)
+
+ def check_writes(self, intermediate_func):
+ # Lots of writes, test the flushed output is as expected.
+ contents = bytes(range(256)) * 1000
+ n = 0
+ writer = MockRawIO()
+ bufio = io.BufferedWriter(writer, 13)
+ # Generator of write sizes: repeat each N 15 times then proceed to N+1
+ def gen_sizes():
+ for size in count(1):
+ for i in range(15):
+ yield size
+ sizes = gen_sizes()
+ while n < len(contents):
+ size = min(next(sizes), len(contents) - n)
+ self.assertEquals(bufio.write(contents[n:n+size]), size)
+ intermediate_func(bufio)
+ n += size
+ bufio.flush()
+ self.assertEquals(contents, b"".join(writer._write_stack))
- bufio.write(b"abc")
- bufio.write(b"defghijkl")
+ def testWrites(self):
+ self.check_writes(lambda bufio: None)
+
+ def testWritesAndFlushes(self):
+ self.check_writes(lambda bufio: bufio.flush())
- self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
+ def testWritesAndSeeks(self):
+ self.check_writes(lambda bufio: bufio.seek(0, 1))
def testWriteNonBlocking(self):
- raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
- bufio = io.BufferedWriter(raw, 8, 16)
+ raw = MockNonBlockWriterIO()
+ bufio = io.BufferedWriter(raw, 8)
- bufio.write(b"asdf")
- bufio.write(b"asdfa")
- self.assertEquals(b"asdfasdfa", raw._write_stack[0])
-
- bufio.write(b"asdfasdfasdf")
- self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
- bufio.write(b"asdfasdfasdf")
- self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
- self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
-
- bufio.write(b"asdfasdfasdf")
-
- # XXX I don't like this test. It relies too heavily on how the
- # algorithm actually works, which we might change. Refactor
- # later.
+ self.assertEquals(bufio.write(b"abcd"), 4)
+ self.assertEquals(bufio.write(b"efghi"), 5)
+ # 1 byte will be written, the rest will be buffered
+ raw.block_on(b"k")
+ self.assertEquals(bufio.write(b"jklmnopqr"), 9)
+
+ # 4 bytes will be written, 8 will be buffered and the rest will be lost
+ raw.block_on(b"0")
+ try:
+ bufio.write(b"wxyz0123456789")
+ except io.BlockingIOError as e:
+ written = e.characters_written
+ else:
+ self.fail("BlockingIOError should have been raised")
+ self.assertEquals(written, 12)
+ self.assertEquals(raw.pop_written(),
+ b"abcdefghijklmnopqrwxyz")
+
+ self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
+ s = raw.pop_written()
+ # Previously buffered bytes were flushed
+ self.assertTrue(s.startswith(b"01234567A"), s)
def testFileno(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -508,16 +601,24 @@
def testFlush(self):
writer = MockRawIO()
bufio = io.BufferedWriter(writer, 8)
-
bufio.write(b"abc")
bufio.flush()
-
self.assertEquals(b"abc", writer._write_stack[0])
def testThreads(self):
- # BufferedWriter should not raise exceptions or crash
- # when called from multiple threads.
try:
+ # Write out many bytes from many threads and test they were
+ # all flushed.
+ N = 1000
+ contents = bytes(range(256)) * N
+ sizes = cycle([1, 19])
+ n = 0
+ queue = deque()
+ while n < len(contents):
+ size = next(sizes)
+ queue.append(contents[n:n+size])
+ n += size
+ del contents
# We use a real file object because it allows us to
# exercise situations where the GIL is released before
# writing the buffer to the raw streams. This is in addition
@@ -528,9 +629,11 @@
errors = []
def f():
try:
- # Write enough bytes to flush the buffer
- s = b"a" * 19
- for i in range(50):
+ while True:
+ try:
+ s = queue.popleft()
+ except IndexError:
+ return
bufio.write(s)
except Exception as e:
errors.append(e)
@@ -543,6 +646,11 @@
t.join()
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
+ bufio.close()
+ with io.open(support.TESTFN, "rb") as f:
+ s = f.read()
+ for i in range(256):
+ self.assertEquals(s.count(bytes([i])), N)
finally:
support.unlink(support.TESTFN)
More information about the Python-checkins
mailing list