[Python-checkins] Revert "bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)" (GH-13793)

Miss Islington (bot) webhook-mailer at python.org
Mon Jun 3 20:09:23 EDT 2019


https://github.com/python/cpython/commit/9535aff9421f0a5639f6e4c4bb0f07a743ea8dba
commit: 9535aff9421f0a5639f6e4c4bb0f07a743ea8dba
branch: master
author: Andrew Svetlov <andrew.svetlov at gmail.com>
committer: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
date: 2019-06-03T17:09:19-07:00
summary:

Revert "bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)" (GH-13793)



https://bugs.python.org/issue35621

files:
D Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
M Lib/asyncio/unix_events.py
M Lib/test/test_asyncio/test_subprocess.py
M Lib/test/test_asyncio/test_unix_events.py

diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index b943845d9363..28128d2977df 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -2,7 +2,6 @@
 
 import errno
 import io
-import itertools
 import os
 import selectors
 import signal
@@ -30,9 +29,7 @@
 __all__ = (
     'SelectorEventLoop',
     'AbstractChildWatcher', 'SafeChildWatcher',
-    'FastChildWatcher',
-    'MultiLoopChildWatcher', 'ThreadedChildWatcher',
-    'DefaultEventLoopPolicy',
+    'FastChildWatcher', 'DefaultEventLoopPolicy',
 )
 
 
@@ -187,13 +184,6 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
                                          stdin, stdout, stderr, bufsize,
                                          extra=None, **kwargs):
         with events.get_child_watcher() as watcher:
-            if not watcher.is_active():
-                # Check early.
-                # Raising exception before process creation
-                # prevents subprocess execution if the watcher
-                # is not ready to handle it.
-                raise RuntimeError("asyncio.get_child_watcher() is not activated, "
-                                   "subprocess support is not installed.")
             waiter = self.create_future()
             transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                               stdin, stdout, stderr, bufsize,
@@ -848,15 +838,6 @@ def close(self):
         """
         raise NotImplementedError()
 
-    def is_active(self):
-        """Watcher status.
-
-        Return True if the watcher is installed and ready to handle process exit
-        notifications.
-
-        """
-        raise NotImplementedError()
-
     def __enter__(self):
         """Enter the watcher's context and allow starting new processes
 
@@ -868,20 +849,6 @@ def __exit__(self, a, b, c):
         raise NotImplementedError()
 
 
-def _compute_returncode(status):
-    if os.WIFSIGNALED(status):
-        # The child process died because of a signal.
-        return -os.WTERMSIG(status)
-    elif os.WIFEXITED(status):
-        # The child process exited (e.g sys.exit()).
-        return os.WEXITSTATUS(status)
-    else:
-        # The child exited, but we don't understand its status.
-        # This shouldn't happen, but if it does, let's just
-        # return that status; perhaps that helps debug it.
-        return status
-
-
 class BaseChildWatcher(AbstractChildWatcher):
 
     def __init__(self):
@@ -891,9 +858,6 @@ def __init__(self):
     def close(self):
         self.attach_loop(None)
 
-    def is_active(self):
-        return self._loop is not None and self._loop.is_running()
-
     def _do_waitpid(self, expected_pid):
         raise NotImplementedError()
 
@@ -934,6 +898,19 @@ def _sig_chld(self):
                 'exception': exc,
             })
 
+    def _compute_returncode(self, status):
+        if os.WIFSIGNALED(status):
+            # The child process died because of a signal.
+            return -os.WTERMSIG(status)
+        elif os.WIFEXITED(status):
+            # The child process exited (e.g sys.exit()).
+            return os.WEXITSTATUS(status)
+        else:
+            # The child exited, but we don't understand its status.
+            # This shouldn't happen, but if it does, let's just
+            # return that status; perhaps that helps debug it.
+            return status
+
 
 class SafeChildWatcher(BaseChildWatcher):
     """'Safe' child watcher implementation.
@@ -957,6 +934,11 @@ def __exit__(self, a, b, c):
         pass
 
     def add_child_handler(self, pid, callback, *args):
+        if self._loop is None:
+            raise RuntimeError(
+                "Cannot add child handler, "
+                "the child watcher does not have a loop attached")
+
         self._callbacks[pid] = (callback, args)
 
         # Prevent a race condition in case the child is already terminated.
@@ -992,7 +974,7 @@ def _do_waitpid(self, expected_pid):
                 # The child process is still alive.
                 return
 
-            returncode = _compute_returncode(status)
+            returncode = self._compute_returncode(status)
             if self._loop.get_debug():
                 logger.debug('process %s exited with returncode %s',
                              expected_pid, returncode)
@@ -1053,6 +1035,11 @@ def __exit__(self, a, b, c):
     def add_child_handler(self, pid, callback, *args):
         assert self._forks, "Must use the context manager"
 
+        if self._loop is None:
+            raise RuntimeError(
+                "Cannot add child handler, "
+                "the child watcher does not have a loop attached")
+
         with self._lock:
             try:
                 returncode = self._zombies.pop(pid)
@@ -1085,7 +1072,7 @@ def _do_waitpid_all(self):
                     # A child process is still alive.
                     return
 
-                returncode = _compute_returncode(status)
+                returncode = self._compute_returncode(status)
 
             with self._lock:
                 try:
@@ -1114,177 +1101,6 @@ def _do_waitpid_all(self):
                 callback(pid, returncode, *args)
 
 
-class MultiLoopChildWatcher(AbstractChildWatcher):
-    # The class keeps compatibility with AbstractChildWatcher ABC
-    # To achieve this it has empty attach_loop() method
-    # and doesn't accept explicit loop argument
-    # for add_child_handler()/remove_child_handler()
-    # but retrieves the current loop by get_running_loop()
-
-    def __init__(self):
-        self._callbacks = {}
-        self._saved_sighandler = None
-
-    def is_active(self):
-        return self._saved_sighandler is not None
-
-    def close(self):
-        self._callbacks.clear()
-        if self._saved_sighandler is not None:
-            handler = signal.getsignal(signal.SIGCHLD)
-            if handler != self._sig_chld:
-                logger.warning("SIGCHLD handler was changed by outside code")
-            else:
-                signal.signal(signal.SIGCHLD, self._saved_sighandler)
-            self._saved_sighandler = None
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        pass
-
-    def add_child_handler(self, pid, callback, *args):
-        loop = events.get_running_loop()
-        self._callbacks[pid] = (loop, callback, args)
-
-        # Prevent a race condition in case the child is already terminated.
-        self._do_waitpid(pid)
-
-    def remove_child_handler(self, pid):
-        try:
-            del self._callbacks[pid]
-            return True
-        except KeyError:
-            return False
-
-    def attach_loop(self, loop):
-        # Don't save the loop but initialize itself if called first time
-        # The reason to do it here is that attach_loop() is called from
-        # unix policy only for the main thread.
-        # Main thread is required for subscription on SIGCHLD signal
-        if self._saved_sighandler is None:
-            self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
-            if self._saved_sighandler is None:
-                logger.warning("Previous SIGCHLD handler was set by non-Python code, "
-                               "restore to default handler on watcher close.")
-                self._saved_sighandler = signal.SIG_DFL
-
-            # Set SA_RESTART to limit EINTR occurrences.
-            signal.siginterrupt(signal.SIGCHLD, False)
-
-    def _do_waitpid_all(self):
-        for pid in list(self._callbacks):
-            self._do_waitpid(pid)
-
-    def _do_waitpid(self, expected_pid):
-        assert expected_pid > 0
-
-        try:
-            pid, status = os.waitpid(expected_pid, os.WNOHANG)
-        except ChildProcessError:
-            # The child process is already reaped
-            # (may happen if waitpid() is called elsewhere).
-            pid = expected_pid
-            returncode = 255
-            logger.warning(
-                "Unknown child process pid %d, will report returncode 255",
-                pid)
-            debug_log = False
-        else:
-            if pid == 0:
-                # The child process is still alive.
-                return
-
-            returncode = _compute_returncode(status)
-            debug_log = True
-        try:
-            loop, callback, args = self._callbacks.pop(pid)
-        except KeyError:  # pragma: no cover
-            # May happen if .remove_child_handler() is called
-            # after os.waitpid() returns.
-            logger.warning("Child watcher got an unexpected pid: %r",
-                           pid, exc_info=True)
-        else:
-            if loop.is_closed():
-                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
-            else:
-                if debug_log and loop.get_debug():
-                    logger.debug('process %s exited with returncode %s',
-                                 expected_pid, returncode)
-                loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
-    def _sig_chld(self, signum, frame):
-        try:
-            self._do_waitpid_all()
-        except (SystemExit, KeyboardInterrupt):
-            raise
-        except BaseException:
-            logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
-
-
-class ThreadedChildWatcher(AbstractChildWatcher):
-    # The watcher uses a thread per process
-    # for waiting for the process finish.
-    # It doesn't require subscription on POSIX signal
-
-    def __init__(self):
-        self._pid_counter = itertools.count(0)
-
-    def is_active(self):
-        return True
-
-    def close(self):
-        pass
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        pass
-
-    def add_child_handler(self, pid, callback, *args):
-        loop = events.get_running_loop()
-        thread = threading.Thread(target=self._do_waitpid,
-                                  name=f"waitpid-{next(self._pid_counter)}",
-                                  args=(loop, pid, callback, args),
-                                  daemon=True)
-        thread.start()
-
-    def remove_child_handler(self, pid):
-        # asyncio never calls remove_child_handler() !!!
-        # The method is no-op but is implemented because
-        # abstract base classe requires it
-        return True
-
-    def attach_loop(self, loop):
-        pass
-
-    def _do_waitpid(self, loop, expected_pid, callback, args):
-        assert expected_pid > 0
-
-        try:
-            pid, status = os.waitpid(expected_pid, 0)
-        except ChildProcessError:
-            # The child process is already reaped
-            # (may happen if waitpid() is called elsewhere).
-            pid = expected_pid
-            returncode = 255
-            logger.warning(
-                "Unknown child process pid %d, will report returncode 255",
-                pid)
-        else:
-            returncode = _compute_returncode(status)
-            if loop.get_debug():
-                logger.debug('process %s exited with returncode %s',
-                             expected_pid, returncode)
-
-        if loop.is_closed():
-            logger.warning("Loop %r that handles pid %r is closed", loop, pid)
-        else:
-            loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
-
 class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
     """UNIX event loop policy with a watcher for child processes."""
     _loop_factory = _UnixSelectorEventLoop
@@ -1296,7 +1112,7 @@ def __init__(self):
     def _init_watcher(self):
         with events._lock:
             if self._watcher is None:  # pragma: no branch
-                self._watcher = ThreadedChildWatcher()
+                self._watcher = SafeChildWatcher()
                 if isinstance(threading.current_thread(),
                               threading._MainThread):
                     self._watcher.attach_loop(self._local._loop)
@@ -1318,7 +1134,7 @@ def set_event_loop(self, loop):
     def get_child_watcher(self):
         """Get the watcher for child processes.
 
-        If not yet set, a ThreadedChildWatcher object is automatically created.
+        If not yet set, a SafeChildWatcher object is automatically created.
         """
         if self._watcher is None:
             self._init_watcher()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index 582e17202460..7d72e6cde4e7 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -633,7 +633,6 @@ def test_create_subprocess_exec_with_path(self):
 
         self.assertIsNone(self.loop.run_until_complete(execute()))
 
-
 if sys.platform != 'win32':
     # Unix
     class SubprocessWatcherMixin(SubprocessMixin):
@@ -649,24 +648,7 @@ def setUp(self):
             watcher = self.Watcher()
             watcher.attach_loop(self.loop)
             policy.set_child_watcher(watcher)
-
-        def tearDown(self):
-            super().setUp()
-            policy = asyncio.get_event_loop_policy()
-            watcher = policy.get_child_watcher()
-            policy.set_child_watcher(None)
-            watcher.attach_loop(None)
-            watcher.close()
-
-    class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
-                                         test_utils.TestCase):
-
-        Watcher = unix_events.ThreadedChildWatcher
-
-    class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
-                                          test_utils.TestCase):
-
-        Watcher = unix_events.MultiLoopChildWatcher
+            self.addCleanup(policy.set_child_watcher, None)
 
     class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
                                      test_utils.TestCase):
@@ -688,25 +670,5 @@ def setUp(self):
             self.set_event_loop(self.loop)
 
 
-class GenericWatcherTests:
-
-    def test_create_subprocess_fails_with_inactive_watcher(self):
-
-        async def execute():
-            watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
-            watcher.is_active.return_value = False
-            asyncio.set_child_watcher(watcher)
-
-            with self.assertRaises(RuntimeError):
-                await subprocess.create_subprocess_exec(
-                    support.FakePath(sys.executable), '-c', 'pass')
-
-            watcher.add_child_handler.assert_not_called()
-
-        self.assertIsNone(self.loop.run_until_complete(execute()))
-
-
-
-
 if __name__ == '__main__':
     unittest.main()
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 462a8b3c7859..5c610cdd67ba 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1082,8 +1082,6 @@ def test_not_implemented(self):
             NotImplementedError, watcher.attach_loop, f)
         self.assertRaises(
             NotImplementedError, watcher.close)
-        self.assertRaises(
-            NotImplementedError, watcher.is_active)
         self.assertRaises(
             NotImplementedError, watcher.__enter__)
         self.assertRaises(
@@ -1786,6 +1784,15 @@ def test_close(self, m):
                 if isinstance(self.watcher, asyncio.FastChildWatcher):
                     self.assertFalse(self.watcher._zombies)
 
+    @waitpid_mocks
+    def test_add_child_handler_with_no_loop_attached(self, m):
+        callback = mock.Mock()
+        with self.create_watcher() as watcher:
+            with self.assertRaisesRegex(
+                    RuntimeError,
+                    'the child watcher does not have a loop attached'):
+                watcher.add_child_handler(100, callback)
+
 
 class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
     def create_watcher(self):
@@ -1802,16 +1809,17 @@ class PolicyTests(unittest.TestCase):
     def create_policy(self):
         return asyncio.DefaultEventLoopPolicy()
 
-    def test_get_default_child_watcher(self):
+    def test_get_child_watcher(self):
         policy = self.create_policy()
         self.assertIsNone(policy._watcher)
 
         watcher = policy.get_child_watcher()
-        self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
+        self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
 
         self.assertIs(policy._watcher, watcher)
 
         self.assertIs(watcher, policy.get_child_watcher())
+        self.assertIsNone(watcher._loop)
 
     def test_get_child_watcher_after_set(self):
         policy = self.create_policy()
@@ -1821,6 +1829,18 @@ def test_get_child_watcher_after_set(self):
         self.assertIs(policy._watcher, watcher)
         self.assertIs(watcher, policy.get_child_watcher())
 
+    def test_get_child_watcher_with_mainloop_existing(self):
+        policy = self.create_policy()
+        loop = policy.get_event_loop()
+
+        self.assertIsNone(policy._watcher)
+        watcher = policy.get_child_watcher()
+
+        self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
+        self.assertIs(watcher._loop, loop)
+
+        loop.close()
+
     def test_get_child_watcher_thread(self):
 
         def f():
@@ -1846,11 +1866,7 @@ def test_child_watcher_replace_mainloop_existing(self):
         policy = self.create_policy()
         loop = policy.get_event_loop()
 
-        # Explicitly setup SafeChildWatcher,
-        # default ThreadedChildWatcher has no _loop property
-        watcher = asyncio.SafeChildWatcher()
-        policy.set_child_watcher(watcher)
-        watcher.attach_loop(loop)
+        watcher = policy.get_child_watcher()
 
         self.assertIs(watcher._loop, loop)
 
diff --git a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
deleted file mode 100644
index c492e1de6d5c..000000000000
--- a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
+++ /dev/null
@@ -1,2 +0,0 @@
-Support running asyncio subprocesses when execution event loop in a thread
-on UNIX.



More information about the Python-checkins mailing list