[Python-3000-checkins] r65687 - in python/branches/py3k: Lib/io.py Lib/test/test_cmd_line.py Lib/test/test_io.py Misc/NEWS

antoine.pitrou python-3000-checkins at python.org
Fri Aug 15 00:44:29 CEST 2008


Author: antoine.pitrou
Date: Fri Aug 15 00:44:29 2008
New Revision: 65687

Log:
Merged revisions 65686 via svnmerge from 
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r65686 | antoine.pitrou | 2008-08-14 23:04:30 +0200 (jeu., 14 août 2008) | 3 lines
  
  Issue #3476: make BufferedReader and BufferedWriter thread-safe
........


Modified:
   python/branches/py3k/   (props changed)
   python/branches/py3k/Lib/io.py
   python/branches/py3k/Lib/test/test_cmd_line.py
   python/branches/py3k/Lib/test/test_io.py
   python/branches/py3k/Misc/NEWS

Modified: python/branches/py3k/Lib/io.py
==============================================================================
--- python/branches/py3k/Lib/io.py	(original)
+++ python/branches/py3k/Lib/io.py	Fri Aug 15 00:44:29 2008
@@ -61,6 +61,7 @@
 import codecs
 import _fileio
 import warnings
+import threading
 
 # open() uses st_blksize whenever we can
 DEFAULT_BUFFER_SIZE = 8 * 1024  # bytes
@@ -895,6 +896,7 @@
         _BufferedIOMixin.__init__(self, raw)
         self.buffer_size = buffer_size
         self._reset_read_buf()
+        self._read_lock = threading.Lock()
 
     def _reset_read_buf(self):
         self._read_buf = b""
@@ -908,6 +910,10 @@
         mode. If n is negative, read until EOF or until read() would
         block.
         """
+        with self._read_lock:
+            return self._read_unlocked(n)
+
+    def _read_unlocked(self, n=None):
         nodata_val = b""
         empty_values = (b"", None)
         buf = self._read_buf
@@ -960,6 +966,10 @@
         do at most one raw read to satisfy it.  We never return more
         than self.buffer_size.
         """
+        with self._read_lock:
+            return self._peek_unlocked(n)
+
+    def _peek_unlocked(self, n=0):
         want = min(n, self.buffer_size)
         have = len(self._read_buf) - self._read_pos
         if have < want:
@@ -976,18 +986,21 @@
         # only return buffered bytes.  Otherwise, we do one raw read.
         if n <= 0:
             return b""
-        self.peek(1)
-        return self.read(min(n, len(self._read_buf) - self._read_pos))
+        with self._read_lock:
+            self._peek_unlocked(1)
+            return self._read_unlocked(
+                min(n, len(self._read_buf) - self._read_pos))
 
     def tell(self):
         return self.raw.tell() - len(self._read_buf) + self._read_pos
 
     def seek(self, pos, whence=0):
-        if whence == 1:
-            pos -= len(self._read_buf) - self._read_pos
-        pos = self.raw.seek(pos, whence)
-        self._reset_read_buf()
-        return pos
+        with self._read_lock:
+            if whence == 1:
+                pos -= len(self._read_buf) - self._read_pos
+            pos = self.raw.seek(pos, whence)
+            self._reset_read_buf()
+            return pos
 
 
 class BufferedWriter(_BufferedIOMixin):
@@ -1009,43 +1022,51 @@
                                 if max_buffer_size is None
                                 else max_buffer_size)
         self._write_buf = bytearray()
+        self._write_lock = threading.Lock()
 
     def write(self, b):
         if self.closed:
             raise ValueError("write to closed file")
         if isinstance(b, str):
             raise TypeError("can't write str to binary stream")
-        # XXX we can implement some more tricks to try and avoid partial writes
-        if len(self._write_buf) > self.buffer_size:
-            # We're full, so let's pre-flush the buffer
-            try:
-                self.flush()
-            except BlockingIOError as e:
-                # We can't accept anything else.
-                # XXX Why not just let the exception pass through?
-                raise BlockingIOError(e.errno, e.strerror, 0)
-        before = len(self._write_buf)
-        self._write_buf.extend(b)
-        written = len(self._write_buf) - before
-        if len(self._write_buf) > self.buffer_size:
-            try:
-                self.flush()
-            except BlockingIOError as e:
-                if (len(self._write_buf) > self.max_buffer_size):
-                    # We've hit max_buffer_size. We have to accept a partial
-                    # write and cut back our buffer.
-                    overage = len(self._write_buf) - self.max_buffer_size
-                    self._write_buf = self._write_buf[:self.max_buffer_size]
-                    raise BlockingIOError(e.errno, e.strerror, overage)
-        return written
+        with self._write_lock:
+            # XXX we can implement some more tricks to try and avoid
+            # partial writes
+            if len(self._write_buf) > self.buffer_size:
+                # We're full, so let's pre-flush the buffer
+                try:
+                    self._flush_unlocked()
+                except BlockingIOError as e:
+                    # We can't accept anything else.
+                    # XXX Why not just let the exception pass through?
+                    raise BlockingIOError(e.errno, e.strerror, 0)
+            before = len(self._write_buf)
+            self._write_buf.extend(b)
+            written = len(self._write_buf) - before
+            if len(self._write_buf) > self.buffer_size:
+                try:
+                    self._flush_unlocked()
+                except BlockingIOError as e:
+                    if len(self._write_buf) > self.max_buffer_size:
+                        # We've hit max_buffer_size. We have to accept a
+                        # partial write and cut back our buffer.
+                        overage = len(self._write_buf) - self.max_buffer_size
+                        self._write_buf = self._write_buf[:self.max_buffer_size]
+                        raise BlockingIOError(e.errno, e.strerror, overage)
+            return written
 
     def truncate(self, pos=None):
-        self.flush()
-        if pos is None:
-            pos = self.raw.tell()
-        return self.raw.truncate(pos)
+        with self._write_lock:
+            self._flush_unlocked()
+            if pos is None:
+                pos = self.raw.tell()
+            return self.raw.truncate(pos)
 
     def flush(self):
+        with self._write_lock:
+            self._flush_unlocked()
+
+    def _flush_unlocked(self):
         if self.closed:
             raise ValueError("flush of closed file")
         written = 0
@@ -1064,8 +1085,9 @@
         return self.raw.tell() + len(self._write_buf)
 
     def seek(self, pos, whence=0):
-        self.flush()
-        return self.raw.seek(pos, whence)
+        with self._write_lock:
+            self._flush_unlocked()
+            return self.raw.seek(pos, whence)
 
 
 class BufferedRWPair(BufferedIOBase):
@@ -1155,7 +1177,8 @@
         # First do the raw seek, then empty the read buffer, so that
         # if the raw seek fails, we don't lose buffered data forever.
         pos = self.raw.seek(pos, whence)
-        self._reset_read_buf()
+        with self._read_lock:
+            self._reset_read_buf()
         return pos
 
     def tell(self):
@@ -1192,8 +1215,9 @@
     def write(self, b):
         if self._read_buf:
             # Undo readahead
-            self.raw.seek(self._read_pos - len(self._read_buf), 1)
-            self._reset_read_buf()
+            with self._read_lock:
+                self.raw.seek(self._read_pos - len(self._read_buf), 1)
+                self._reset_read_buf()
         return BufferedWriter.write(self, b)
 
 

Modified: python/branches/py3k/Lib/test/test_cmd_line.py
==============================================================================
--- python/branches/py3k/Lib/test/test_cmd_line.py	(original)
+++ python/branches/py3k/Lib/test/test_cmd_line.py	Fri Aug 15 00:44:29 2008
@@ -3,11 +3,15 @@
 # See test_cmd_line_script.py for testing of script execution
 
 import test.support, unittest
+import os
 import sys
 import subprocess
 
 def _spawn_python(*args):
-    cmd_line = [sys.executable, '-E']
+    cmd_line = [sys.executable]
+    # When testing -S, we need PYTHONPATH to work (see test_site_flag())
+    if '-S' not in args:
+        cmd_line.append('-E')
     cmd_line.extend(args)
     return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
@@ -59,6 +63,16 @@
         self.verify_valid_flag('-Qwarnall')
 
     def test_site_flag(self):
+        if os.name == 'posix':
+            # Workaround bug #586680 by adding the extension dir to PYTHONPATH
+            from distutils.util import get_platform
+            s = "./build/lib.%s-%.3s" % (get_platform(), sys.version)
+            if hasattr(sys, 'gettotalrefcount'):
+                s += '-pydebug'
+            p = os.environ.get('PYTHONPATH', '')
+            if p:
+                p += ':'
+            os.environ['PYTHONPATH'] = p + s
         self.verify_valid_flag('-S')
 
     def test_usage(self):

Modified: python/branches/py3k/Lib/test/test_io.py
==============================================================================
--- python/branches/py3k/Lib/test/test_io.py	(original)
+++ python/branches/py3k/Lib/test/test_io.py	Fri Aug 15 00:44:29 2008
@@ -4,8 +4,10 @@
 import sys
 import time
 import array
+import threading
+import random
 import unittest
-from itertools import chain
+from itertools import chain, cycle
 from test import support
 
 import codecs
@@ -390,6 +392,49 @@
         # this test. Else, write it.
         pass
 
+    def testThreads(self):
+        try:
+            # Write out many bytes with exactly the same number of 0's,
+            # 1's... 255's. This will help us check that concurrent reading
+            # doesn't duplicate or forget contents.
+            N = 1000
+            l = list(range(256)) * N
+            random.shuffle(l)
+            s = bytes(bytearray(l))
+            with io.open(support.TESTFN, "wb") as f:
+                f.write(s)
+            with io.open(support.TESTFN, "rb", buffering=0) as raw:
+                bufio = io.BufferedReader(raw, 8)
+                errors = []
+                results = []
+                def f():
+                    try:
+                        # Intra-buffer read then buffer-flushing read
+                        for n in cycle([1, 19]):
+                            s = bufio.read(n)
+                            if not s:
+                                break
+                            # list.append() is atomic
+                            results.append(s)
+                    except Exception as e:
+                        errors.append(e)
+                        raise
+                threads = [threading.Thread(target=f) for x in range(20)]
+                for t in threads:
+                    t.start()
+                time.sleep(0.02) # yield
+                for t in threads:
+                    t.join()
+                self.assertFalse(errors,
+                    "the following exceptions were caught: %r" % errors)
+                s = b''.join(results)
+                for i in range(256):
+                    c = bytes(bytearray([i]))
+                    self.assertEqual(s.count(c), N)
+        finally:
+            support.unlink(support.TESTFN)
+
+
 
 class BufferedWriterTest(unittest.TestCase):
 
@@ -446,6 +491,38 @@
 
         self.assertEquals(b"abc", writer._write_stack[0])
 
+    def testThreads(self):
+        # BufferedWriter should not raise exceptions or crash
+        # when called from multiple threads.
+        try:
+            # We use a real file object because it allows us to
+            # exercise situations where the GIL is released before
+            # writing the buffer to the raw streams. This is in addition
+            # to concurrency issues due to switching threads in the middle
+            # of Python code.
+            with io.open(support.TESTFN, "wb", buffering=0) as raw:
+                bufio = io.BufferedWriter(raw, 8)
+                errors = []
+                def f():
+                    try:
+                        # Write enough bytes to flush the buffer
+                        s = b"a" * 19
+                        for i in range(50):
+                            bufio.write(s)
+                    except Exception as e:
+                        errors.append(e)
+                        raise
+                threads = [threading.Thread(target=f) for x in range(20)]
+                for t in threads:
+                    t.start()
+                time.sleep(0.02) # yield
+                for t in threads:
+                    t.join()
+                self.assertFalse(errors,
+                    "the following exceptions were caught: %r" % errors)
+        finally:
+            support.unlink(support.TESTFN)
+
 
 class BufferedRWPairTest(unittest.TestCase):
 

Modified: python/branches/py3k/Misc/NEWS
==============================================================================
--- python/branches/py3k/Misc/NEWS	(original)
+++ python/branches/py3k/Misc/NEWS	Fri Aug 15 00:44:29 2008
@@ -30,6 +30,9 @@
 Library
 -------
 
+- Issue #3476: binary buffered reading through the new "io" library is now
+  thread-safe.
+
 - Issue #1342811: Fix leak in Tkinter.Menu.delete. Commands associated to
   menu entries were not deleted.
 


More information about the Python-3000-checkins mailing list