[Python-checkins] cpython (3.5): Don't select for read on character devices in _UnixWritePipeTransport.

guido.van.rossum python-checkins at python.org
Wed Aug 31 12:53:32 EDT 2016


https://hg.python.org/cpython/rev/fa8fadc7f065
changeset:   102973:fa8fadc7f065
branch:      3.5
parent:      102971:2a748320616b
user:        Guido van Rossum <guido at python.org>
date:        Wed Aug 31 09:40:18 2016 -0700
summary:
  Don't select for read on character devices in _UnixWritePipeTransport.

Upstream https://github.com/python/asyncio/pull/374 by Ron Frederick.

files:
  Lib/asyncio/unix_events.py           |   8 +-
  Lib/test/test_asyncio/test_events.py |  75 ++++++++++++++++
  2 files changed, 79 insertions(+), 4 deletions(-)


diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -422,10 +422,10 @@
         self._pipe = pipe
         self._fileno = pipe.fileno()
         mode = os.fstat(self._fileno).st_mode
+        is_char = stat.S_ISCHR(mode)
+        is_fifo = stat.S_ISFIFO(mode)
         is_socket = stat.S_ISSOCK(mode)
-        if not (is_socket or
-                stat.S_ISFIFO(mode) or
-                stat.S_ISCHR(mode)):
+        if not (is_char or is_fifo or is_socket):
             raise ValueError("Pipe transport is only for "
                              "pipes, sockets and character devices")
         _set_nonblocking(self._fileno)
@@ -439,7 +439,7 @@
         # On AIX, the reader trick (to be notified when the read end of the
         # socket is closed) only works for sockets. On other platforms it
         # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
-        if is_socket or not sys.platform.startswith("aix"):
+        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
             # only start reading when connection_made() has been called
             self._loop.call_soon(self._loop.add_reader,
                                  self._fileno, self._read_ready)
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -21,6 +21,8 @@
 from unittest import mock
 import weakref
 
+if sys.platform != 'win32':
+    import tty
 
 import asyncio
 from asyncio import proactor_events
@@ -1626,6 +1628,79 @@
         self.loop.run_until_complete(proto.done)
         self.assertEqual('CLOSED', proto.state)
 
+    @unittest.skipUnless(sys.platform != 'win32',
+                         "Don't support pipes for Windows")
+    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
+    # older than 10.6 (Snow Leopard)
+    @support.requires_mac_ver(10, 6)
+    def test_bidirectional_pty(self):
+        master, read_slave = os.openpty()
+        write_slave = os.dup(read_slave)
+        tty.setraw(read_slave)
+
+        slave_read_obj = io.open(read_slave, 'rb', 0)
+        read_proto = MyReadPipeProto(loop=self.loop)
+        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
+                                                   slave_read_obj)
+        read_transport, p = self.loop.run_until_complete(read_connect)
+        self.assertIs(p, read_proto)
+        self.assertIs(read_transport, read_proto.transport)
+        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+        self.assertEqual(0, read_proto.nbytes)
+
+
+        slave_write_obj = io.open(write_slave, 'wb', 0)
+        write_proto = MyWritePipeProto(loop=self.loop)
+        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
+                                                     slave_write_obj)
+        write_transport, p = self.loop.run_until_complete(write_connect)
+        self.assertIs(p, write_proto)
+        self.assertIs(write_transport, write_proto.transport)
+        self.assertEqual('CONNECTED', write_proto.state)
+
+        data = bytearray()
+        def reader(data):
+            chunk = os.read(master, 1024)
+            data += chunk
+            return len(data)
+
+        write_transport.write(b'1')
+        test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
+        self.assertEqual(b'1', data)
+        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+        self.assertEqual('CONNECTED', write_proto.state)
+
+        os.write(master, b'a')
+        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
+                             timeout=10)
+        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+        self.assertEqual(1, read_proto.nbytes)
+        self.assertEqual('CONNECTED', write_proto.state)
+
+        write_transport.write(b'2345')
+        test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
+        self.assertEqual(b'12345', data)
+        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+        self.assertEqual('CONNECTED', write_proto.state)
+
+        os.write(master, b'bcde')
+        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
+                             timeout=10)
+        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
+        self.assertEqual(5, read_proto.nbytes)
+        self.assertEqual('CONNECTED', write_proto.state)
+
+        os.close(master)
+
+        read_transport.close()
+        self.loop.run_until_complete(read_proto.done)
+        self.assertEqual(
+            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
+
+        write_transport.close()
+        self.loop.run_until_complete(write_proto.done)
+        self.assertEqual('CLOSED', write_proto.state)
+
     def test_prompt_cancellation(self):
         r, w = test_utils.socketpair()
         r.setblocking(False)

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list