[Python-checkins] cpython: Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module

victor.stinner python-checkins at python.org
Sat Feb 1 22:54:38 CET 2014


http://hg.python.org/cpython/rev/d7ac90c0463a
changeset:   88889:d7ac90c0463a
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Sat Feb 01 22:49:59 2014 +0100
summary:
  Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module

* Add a new asyncio.subprocess module
* Add new create_subprocess_exec() and create_subprocess_shell() functions
* The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers
  for stdout and stderr and a stream writer for stdin.
* The new asyncio.subprocess.Process class offers an API close to the
  subprocess.Popen class:

  - pid, returncode, stdin, stdout and stderr attributes
  - communicate(), wait(), send_signal(), terminate() and kill() methods

* Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess
  and unix_events, to not be confused with the symbols with the same name of
  subprocess and asyncio.subprocess modules
* _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size
  of the pending write
* _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if
  the write buffer size is greater than the high water mark (64 KB by default)

files:
  Lib/asyncio/__init__.py                      |    2 +
  Lib/asyncio/base_subprocess.py               |   23 +-
  Lib/asyncio/proactor_events.py               |   34 +-
  Lib/asyncio/subprocess.py                    |  197 ++++++++++
  Lib/asyncio/unix_events.py                   |    7 +-
  Lib/test/test_asyncio/test_base_events.py    |   15 +-
  Lib/test/test_asyncio/test_events.py         |    8 -
  Lib/test/test_asyncio/test_subprocess.py     |  196 +++++++++
  Lib/test/test_asyncio/test_windows_events.py |    2 +-
  9 files changed, 434 insertions(+), 50 deletions(-)


diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -24,6 +24,7 @@
 from .protocols import *
 from .queues import *
 from .streams import *
+from .subprocess import *
 from .tasks import *
 from .transports import *
 
@@ -39,5 +40,6 @@
            protocols.__all__ +
            queues.__all__ +
            streams.__all__ +
+           subprocess.__all__ +
            tasks.__all__ +
            transports.__all__)
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -6,11 +6,6 @@
 from . import transports
 
 
-STDIN = 0
-STDOUT = 1
-STDERR = 2
-
-
 class BaseSubprocessTransport(transports.SubprocessTransport):
 
     def __init__(self, loop, protocol, args, shell,
@@ -22,11 +17,11 @@
 
         self._pipes = {}
         if stdin == subprocess.PIPE:
-            self._pipes[STDIN] = None
+            self._pipes[0] = None
         if stdout == subprocess.PIPE:
-            self._pipes[STDOUT] = None
+            self._pipes[1] = None
         if stderr == subprocess.PIPE:
-            self._pipes[STDERR] = None
+            self._pipes[2] = None
         self._pending_calls = collections.deque()
         self._finished = False
         self._returncode = None
@@ -76,19 +71,19 @@
         loop = self._loop
         if proc.stdin is not None:
             _, pipe = yield from loop.connect_write_pipe(
-                lambda: WriteSubprocessPipeProto(self, STDIN),
+                lambda: WriteSubprocessPipeProto(self, 0),
                 proc.stdin)
-            self._pipes[STDIN] = pipe
+            self._pipes[0] = pipe
         if proc.stdout is not None:
             _, pipe = yield from loop.connect_read_pipe(
-                lambda: ReadSubprocessPipeProto(self, STDOUT),
+                lambda: ReadSubprocessPipeProto(self, 1),
                 proc.stdout)
-            self._pipes[STDOUT] = pipe
+            self._pipes[1] = pipe
         if proc.stderr is not None:
             _, pipe = yield from loop.connect_read_pipe(
-                lambda: ReadSubprocessPipeProto(self, STDERR),
+                lambda: ReadSubprocessPipeProto(self, 2),
                 proc.stderr)
-            self._pipes[STDERR] = pipe
+            self._pipes[2] = pipe
 
         assert self._pending_calls is not None
 
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -29,6 +29,7 @@
         self._buffer = None  # None or bytearray.
         self._read_fut = None
         self._write_fut = None
+        self._pending_write = 0
         self._conn_lost = 0
         self._closing = False  # Set when close() called.
         self._eof_written = False
@@ -68,6 +69,7 @@
         if self._read_fut:
             self._read_fut.cancel()
         self._write_fut = self._read_fut = None
+        self._pending_write = 0
         self._buffer = None
         self._loop.call_soon(self._call_connection_lost, exc)
 
@@ -128,11 +130,10 @@
         self._low_water = low
 
     def get_write_buffer_size(self):
-        # NOTE: This doesn't take into account data already passed to
-        # send() even if send() hasn't finished yet.
-        if not self._buffer:
-            return 0
-        return len(self._buffer)
+        size = self._pending_write
+        if self._buffer is not None:
+            size += len(self._buffer)
+        return size
 
 
 class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
@@ -206,7 +207,7 @@
 
 
 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
-                                  transports.WriteTransport):
+                                      transports.WriteTransport):
     """Transport for write pipes."""
 
     def write(self, data):
@@ -252,6 +253,7 @@
         try:
             assert f is self._write_fut
             self._write_fut = None
+            self._pending_write = 0
             if f:
                 f.result()
             if data is None:
@@ -262,15 +264,21 @@
                     self._loop.call_soon(self._call_connection_lost, None)
                 if self._eof_written:
                     self._sock.shutdown(socket.SHUT_WR)
+                # Now that we've reduced the buffer size, tell the
+                # protocol to resume writing if it was paused.  Note that
+                # we do this last since the callback is called immediately
+                # and it may add more data to the buffer (even causing the
+                # protocol to be paused again).
+                self._maybe_resume_protocol()
             else:
                 self._write_fut = self._loop._proactor.send(self._sock, data)
-                self._write_fut.add_done_callback(self._loop_writing)
-            # Now that we've reduced the buffer size, tell the
-            # protocol to resume writing if it was paused.  Note that
-            # we do this last since the callback is called immediately
-            # and it may add more data to the buffer (even causing the
-            # protocol to be paused again).
-            self._maybe_resume_protocol()
+                if not self._write_fut.done():
+                    assert self._pending_write == 0
+                    self._pending_write = len(data)
+                    self._write_fut.add_done_callback(self._loop_writing)
+                    self._maybe_pause_protocol()
+                else:
+                    self._write_fut.add_done_callback(self._loop_writing)
         except ConnectionResetError as exc:
             self._force_close(exc)
         except OSError as exc:
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
new file mode 100644
--- /dev/null
+++ b/Lib/asyncio/subprocess.py
@@ -0,0 +1,197 @@
+__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
+
+import collections
+import subprocess
+
+from . import events
+from . import futures
+from . import protocols
+from . import streams
+from . import tasks
+
+
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+DEVNULL = subprocess.DEVNULL
+
+
+class SubprocessStreamProtocol(streams.FlowControlMixin,
+                               protocols.SubprocessProtocol):
+    """Like StreamReaderProtocol, but for a subprocess."""
+
+    def __init__(self, limit, loop):
+        super().__init__(loop=loop)
+        self._limit = limit
+        self.stdin = self.stdout = self.stderr = None
+        self.waiter = futures.Future(loop=loop)
+        self._waiters = collections.deque()
+        self._transport = None
+
+    def connection_made(self, transport):
+        self._transport = transport
+        if transport.get_pipe_transport(1):
+            self.stdout = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+        if transport.get_pipe_transport(2):
+            self.stderr = streams.StreamReader(limit=self._limit,
+                                               loop=self._loop)
+        stdin = transport.get_pipe_transport(0)
+        if stdin is not None:
+            self.stdin = streams.StreamWriter(stdin,
+                                              protocol=self,
+                                              reader=None,
+                                              loop=self._loop)
+        self.waiter.set_result(None)
+
+    def pipe_data_received(self, fd, data):
+        if fd == 1:
+            reader = self.stdout
+        elif fd == 2:
+            reader = self.stderr
+        else:
+            reader = None
+        if reader is not None:
+            reader.feed_data(data)
+
+    def pipe_connection_lost(self, fd, exc):
+        if fd == 0:
+            pipe = self.stdin
+            if pipe is not None:
+                pipe.close()
+            self.connection_lost(exc)
+            return
+        if fd == 1:
+            reader = self.stdout
+        elif fd == 2:
+            reader = self.stderr
+        else:
+            reader = None
+        if reader != None:
+            if exc is None:
+                reader.feed_eof()
+            else:
+                reader.set_exception(exc)
+
+    def process_exited(self):
+        # wake up futures waiting for wait()
+        returncode = self._transport.get_returncode()
+        while self._waiters:
+            waiter = self._waiters.popleft()
+            waiter.set_result(returncode)
+
+
+class Process:
+    def __init__(self, transport, protocol, loop):
+        self._transport = transport
+        self._protocol = protocol
+        self._loop = loop
+        self.stdin = protocol.stdin
+        self.stdout = protocol.stdout
+        self.stderr = protocol.stderr
+        self.pid = transport.get_pid()
+
+    @property
+    def returncode(self):
+        return self._transport.get_returncode()
+
+    @tasks.coroutine
+    def wait(self):
+        """Wait until the process exit and return the process return code."""
+        returncode = self._transport.get_returncode()
+        if returncode is not None:
+            return returncode
+
+        waiter = futures.Future(loop=self._loop)
+        self._protocol._waiters.append(waiter)
+        yield from waiter
+        return waiter.result()
+
+    def get_subprocess(self):
+        return self._transport.get_extra_info('subprocess')
+
+    def _check_alive(self):
+        if self._transport.get_returncode() is not None:
+            raise ProcessLookupError()
+
+    def send_signal(self, signal):
+        self._check_alive()
+        self._transport.send_signal(signal)
+
+    def terminate(self):
+        self._check_alive()
+        self._transport.terminate()
+
+    def kill(self):
+        self._check_alive()
+        self._transport.kill()
+
+    @tasks.coroutine
+    def _feed_stdin(self, input):
+        self.stdin.write(input)
+        yield from self.stdin.drain()
+        self.stdin.close()
+
+    @tasks.coroutine
+    def _noop(self):
+        return None
+
+    @tasks.coroutine
+    def _read_stream(self, fd):
+        transport = self._transport.get_pipe_transport(fd)
+        if fd == 2:
+            stream = self.stderr
+        else:
+            assert fd == 1
+            stream = self.stdout
+        output = yield from stream.read()
+        transport.close()
+        return output
+
+    @tasks.coroutine
+    def communicate(self, input=None):
+        loop = self._transport._loop
+        if input:
+            stdin = self._feed_stdin(input)
+        else:
+            stdin = self._noop()
+        if self.stdout is not None:
+            stdout = self._read_stream(1)
+        else:
+            stdout = self._noop()
+        if self.stderr is not None:
+            stderr = self._read_stream(2)
+        else:
+            stderr = self._noop()
+        stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
+                                                        loop=loop)
+        yield from self.wait()
+        return (stdout, stderr)
+
+
+ at tasks.coroutine
+def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
+                            loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+    if loop is None:
+        loop = events.get_event_loop()
+    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+                                                        loop=loop)
+    transport, protocol = yield from loop.subprocess_shell(
+                                            protocol_factory,
+                                            cmd, stdin=stdin, stdout=stdout,
+                                            stderr=stderr, **kwds)
+    yield from protocol.waiter
+    return Process(transport, protocol, loop)
+
+ at tasks.coroutine
+def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
+                           loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+    if loop is None:
+        loop = events.get_event_loop()
+    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+                                                        loop=loop)
+    transport, protocol = yield from loop.subprocess_exec(
+                                            protocol_factory,
+                                            *args, stdin=stdin, stdout=stdout,
+                                            stderr=stderr, **kwds)
+    yield from protocol.waiter
+    return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -21,16 +21,11 @@
 from .log import logger
 
 
-__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
+__all__ = ['SelectorEventLoop',
            'AbstractChildWatcher', 'SafeChildWatcher',
            'FastChildWatcher', 'DefaultEventLoopPolicy',
            ]
 
-STDIN = 0
-STDOUT = 1
-STDERR = 2
-
-
 if sys.platform == 'win32':  # pragma: no cover
     raise ImportError('Signals are not really supported on Windows')
 
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -116,18 +116,17 @@
             self.loop.stop()
 
         self.loop._process_events = unittest.mock.Mock()
-        delay = 0.1
-
-        when = self.loop.time() + delay
+        when = self.loop.time() + 0.1
         self.loop.call_at(when, cb)
         t0 = self.loop.time()
         self.loop.run_forever()
         dt = self.loop.time() - t0
-
-        self.assertGreaterEqual(dt, delay - self.loop._granularity, dt)
-        # tolerate a difference of +800 ms because some Python buildbots
-        # are really slow
-        self.assertLessEqual(dt, 0.9, dt)
+        self.assertTrue(0.09 <= dt <= 0.9,
+                        # Issue #20452: add more info in case of failure,
+                        # to try to investigate the bug
+                        (dt,
+                         self.loop._granularity,
+                         time.get_clock_info('monotonic')))
 
     def test_run_once_in_executor_handle(self):
         def cb():
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1179,14 +1179,6 @@
         calls.append(self.loop._run_once_counter)
         self.assertEqual(calls, [1, 3, 5, 6])
 
-    def test_granularity(self):
-        granularity = self.loop._granularity
-        self.assertGreater(granularity, 0.0)
-        # Worst expected granularity: 1 ms on Linux (limited by poll/epoll
-        # resolution), 15.6 ms on Windows (limited by time.monotonic
-        # resolution)
-        self.assertLess(granularity, 0.050)
-
 
 class SubprocessTestsMixin:
 
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -0,0 +1,196 @@
+from asyncio import subprocess
+import asyncio
+import signal
+import sys
+import unittest
+from test import support
+if sys.platform != 'win32':
+    from asyncio import unix_events
+
+# Program exiting quickly
+PROGRAM_EXIT_FAST = [sys.executable, '-c', 'pass']
+
+# Program blocking
+PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
+
+# Program sleeping during 1 second
+PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)']
+
+# Program copying input to output
+PROGRAM_CAT = [
+    sys.executable, '-c',
+    ';'.join(('import sys',
+              'data = sys.stdin.buffer.read()',
+              'sys.stdout.buffer.write(data)'))]
+
+class SubprocessMixin:
+    def test_stdin_stdout(self):
+        args = PROGRAM_CAT
+
+        @asyncio.coroutine
+        def run(data):
+            proc = yield from asyncio.create_subprocess_exec(
+                                          *args,
+                                          stdin=subprocess.PIPE,
+                                          stdout=subprocess.PIPE,
+                                          loop=self.loop)
+
+            # feed data
+            proc.stdin.write(data)
+            yield from proc.stdin.drain()
+            proc.stdin.close()
+
+            # get output and exitcode
+            data = yield from proc.stdout.read()
+            exitcode = yield from proc.wait()
+            return (exitcode, data)
+
+        task = run(b'some data')
+        task = asyncio.wait_for(task, 10.0, loop=self.loop)
+        exitcode, stdout = self.loop.run_until_complete(task)
+        self.assertEqual(exitcode, 0)
+        self.assertEqual(stdout, b'some data')
+
+    def test_communicate(self):
+        args = PROGRAM_CAT
+
+        @asyncio.coroutine
+        def run(data):
+            proc = yield from asyncio.create_subprocess_exec(
+                                          *args,
+                                          stdin=subprocess.PIPE,
+                                          stdout=subprocess.PIPE,
+                                          loop=self.loop)
+            stdout, stderr = yield from proc.communicate(data)
+            return proc.returncode, stdout
+
+        task = run(b'some data')
+        task = asyncio.wait_for(task, 10.0, loop=self.loop)
+        exitcode, stdout = self.loop.run_until_complete(task)
+        self.assertEqual(exitcode, 0)
+        self.assertEqual(stdout, b'some data')
+
+    def test_shell(self):
+        create = asyncio.create_subprocess_shell('exit 7',
+                                                 loop=self.loop)
+        proc = self.loop.run_until_complete(create)
+        exitcode = self.loop.run_until_complete(proc.wait())
+        self.assertEqual(exitcode, 7)
+
+    def test_start_new_session(self):
+        # start the new process in a new session
+        create = asyncio.create_subprocess_shell('exit 8',
+                                                 start_new_session=True,
+                                                 loop=self.loop)
+        proc = self.loop.run_until_complete(create)
+        exitcode = self.loop.run_until_complete(proc.wait())
+        self.assertEqual(exitcode, 8)
+
+    def test_kill(self):
+        args = PROGRAM_BLOCKED
+        create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+        proc = self.loop.run_until_complete(create)
+        proc.kill()
+        returncode = self.loop.run_until_complete(proc.wait())
+        if sys.platform == 'win32':
+            self.assertIsInstance(returncode, int)
+            # expect 1 but sometimes get 0
+        else:
+            self.assertEqual(-signal.SIGKILL, returncode)
+
+    def test_terminate(self):
+        args = PROGRAM_BLOCKED
+        create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+        proc = self.loop.run_until_complete(create)
+        proc.terminate()
+        returncode = self.loop.run_until_complete(proc.wait())
+        if sys.platform == 'win32':
+            self.assertIsInstance(returncode, int)
+            # expect 1 but sometimes get 0
+        else:
+            self.assertEqual(-signal.SIGTERM, returncode)
+
+    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
+    def test_send_signal(self):
+        args = PROGRAM_BLOCKED
+        create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+        proc = self.loop.run_until_complete(create)
+        proc.send_signal(signal.SIGHUP)
+        returncode = self.loop.run_until_complete(proc.wait())
+        self.assertEqual(-signal.SIGHUP, returncode)
+
+    def test_get_subprocess(self):
+        args = PROGRAM_EXIT_FAST
+
+        @asyncio.coroutine
+        def run():
+            proc = yield from asyncio.create_subprocess_exec(*args,
+                                                             loop=self.loop)
+            yield from proc.wait()
+
+            popen = proc.get_subprocess()
+            popen.wait()
+            return (proc, popen)
+
+        proc, popen = self.loop.run_until_complete(run())
+        self.assertEqual(popen.returncode, proc.returncode)
+        self.assertEqual(popen.pid, proc.pid)
+
+    def test_broken_pipe(self):
+        large_data = b'x' * support.PIPE_MAX_SIZE
+
+        create = asyncio.create_subprocess_exec(
+                             *PROGRAM_SLEEP_1SEC,
+                             stdin=subprocess.PIPE,
+                             loop=self.loop)
+        proc = self.loop.run_until_complete(create)
+        with self.assertRaises(BrokenPipeError):
+            self.loop.run_until_complete(proc.communicate(large_data))
+        self.loop.run_until_complete(proc.wait())
+
+
+if sys.platform != 'win32':
+    # Unix
+    class SubprocessWatcherMixin(SubprocessMixin):
+        Watcher = None
+
+        def setUp(self):
+            policy = asyncio.get_event_loop_policy()
+            self.loop = policy.new_event_loop()
+
+            # ensure that the event loop is passed explicitly in the code
+            policy.set_event_loop(None)
+
+            watcher = self.Watcher()
+            watcher.attach_loop(self.loop)
+            policy.set_child_watcher(watcher)
+
+        def tearDown(self):
+            policy = asyncio.get_event_loop_policy()
+            policy.set_child_watcher(None)
+            self.loop.close()
+            policy.set_event_loop(None)
+
+    class SubprocessSafeWatcherTests(SubprocessWatcherMixin, unittest.TestCase):
+        Watcher = unix_events.SafeChildWatcher
+
+    class SubprocessFastWatcherTests(SubprocessWatcherMixin, unittest.TestCase):
+        Watcher = unix_events.FastChildWatcher
+else:
+    # Windows
+    class SubprocessProactorTests(SubprocessMixin, unittest.TestCase):
+        def setUp(self):
+            policy = asyncio.get_event_loop_policy()
+            self.loop = asyncio.ProactorEventLoop()
+
+            # ensure that the event loop is passed explicitly in the code
+            policy.set_event_loop(None)
+
+        def tearDown(self):
+            policy = asyncio.get_event_loop_policy()
+            self.loop.close()
+            policy.set_event_loop(None)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -105,7 +105,7 @@
         self.loop.run_until_complete(f)
         elapsed = self.loop.time() - start
         self.assertFalse(f.result())
-        self.assertTrue(0.18 < elapsed < 0.9, elapsed)
+        self.assertTrue(0.18 < elapsed < 0.5, elapsed)
 
         _overlapped.SetEvent(event)
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list