[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4->default: asyncio: Fix upstream issue 168: StreamReader.read(-1) from

guido.van.rossum python-checkins at python.org
Mon May 12 19:05:19 CEST 2014


http://hg.python.org/cpython/rev/2af5a52b9b87
changeset:   90663:2af5a52b9b87
parent:      90661:9493fdad2a75
parent:      90662:909ea8cc86bb
user:        Guido van Rossum <guido at python.org>
date:        Mon May 12 10:05:04 2014 -0700
summary:
  Merge 3.4->default: asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data exceeds buffer limit.

files:
  Lib/asyncio/streams.py                |  17 ++++--
  Lib/test/test_asyncio/test_streams.py |  36 +++++++++++++++
  2 files changed, 47 insertions(+), 6 deletions(-)


diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -419,12 +419,17 @@
             return b''
 
         if n < 0:
-            while not self._eof:
-                self._waiter = self._create_waiter('read')
-                try:
-                    yield from self._waiter
-                finally:
-                    self._waiter = None
+            # This used to just loop creating a new waiter hoping to
+            # collect everything in self._buffer, but that would
+            # deadlock if the subprocess sends more than self.limit
+            # bytes.  So just call self.read(self._limit) until EOF.
+            blocks = []
+            while True:
+                block = yield from self.read(self._limit)
+                if not block:
+                    break
+                blocks.append(block)
+            return b''.join(blocks)
         else:
             if not self._buffer and not self._eof:
                 self._waiter = self._create_waiter('read')
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1,7 +1,9 @@
 """Tests for streams.py."""
 
 import gc
+import os
 import socket
+import sys
 import unittest
 from unittest import mock
 try:
@@ -583,6 +585,40 @@
             server.stop()
             self.assertEqual(msg, b"hello world!\n")
 
+    @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
+    def test_read_all_from_pipe_reader(self):
+        # See Tulip issue 168.  This test is derived from the example
+        # subprocess_attach_read_pipe.py, but we configure the
+        # StreamReader's limit so that twice it is less than the size
+        # of the data writter.  Also we must explicitly attach a child
+        # watcher to the event loop.
+
+        watcher = asyncio.get_child_watcher()
+        watcher.attach_loop(self.loop)
+
+        code = """\
+import os, sys
+fd = int(sys.argv[1])
+os.write(fd, b'data')
+os.close(fd)
+"""
+        rfd, wfd = os.pipe()
+        args = [sys.executable, '-c', code, str(wfd)]
+
+        pipe = open(rfd, 'rb', 0)
+        reader = asyncio.StreamReader(loop=self.loop, limit=1)
+        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
+        transport, _ = self.loop.run_until_complete(
+            self.loop.connect_read_pipe(lambda: protocol, pipe))
+
+        proc = self.loop.run_until_complete(
+            asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
+        self.loop.run_until_complete(proc.wait())
+
+        os.close(wfd)
+        data = self.loop.run_until_complete(reader.read(-1))
+        self.assertEqual(data, b'data')
+
 
 if __name__ == '__main__':
     unittest.main()

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list