[Python-checkins] bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (GH-14344)

Andrew Svetlov webhook-mailer at python.org
Sun Jun 30 05:55:08 EDT 2019


https://github.com/python/cpython/commit/0d671c04c39b52e44597491b893eb0b6c86b3d45
commit: 0d671c04c39b52e44597491b893eb0b6c86b3d45
branch: master
author: Andrew Svetlov <andrew.svetlov at gmail.com>
committer: GitHub <noreply at github.com>
date: 2019-06-30T12:54:59+03:00
summary:

bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread  (GH-14344)

files:
A Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
M Doc/library/asyncio-policy.rst
M Doc/library/asyncio-subprocess.rst
M Lib/asyncio/unix_events.py
M Lib/test/test_asyncio/test_subprocess.py
M Lib/test/test_asyncio/test_unix_events.py
M Lib/test/test_asyncio/utils.py

diff --git a/Doc/library/asyncio-policy.rst b/Doc/library/asyncio-policy.rst
index 6212df85dbc1..aa8f8f13eae0 100644
--- a/Doc/library/asyncio-policy.rst
+++ b/Doc/library/asyncio-policy.rst
@@ -117,6 +117,7 @@ asyncio ships with the following built-in policies:
 
    .. availability:: Windows.
 
+.. _asyncio-watchers:
 
 Process Watchers
 ================
@@ -129,10 +130,11 @@ In asyncio, child processes are created with
 :func:`create_subprocess_exec` and :meth:`loop.subprocess_exec`
 functions.
 
-asyncio defines the :class:`AbstractChildWatcher` abstract base class,
-which child watchers should implement, and has two different
-implementations: :class:`SafeChildWatcher` (configured to be used
-by default) and :class:`FastChildWatcher`.
+asyncio defines the :class:`AbstractChildWatcher` abstract base class, which child
+watchers should implement, and has four different implementations:
+:class:`ThreadedChildWatcher` (configured to be used by default),
+:class:`MultiLoopChildWatcher`, :class:`SafeChildWatcher`, and
+:class:`FastChildWatcher`.
 
 See also the :ref:`Subprocess and Threads <asyncio-subprocess-threads>`
 section.
@@ -184,6 +186,15 @@ implementation used by the asyncio event loop:
 
       Note: loop may be ``None``.
 
+   .. method:: is_active()
+
+      Return ``True`` if the watcher is ready to use.
+
+      Spawning a subprocess with *inactive* current child watcher raises
+      :exc:`RuntimeError`.
+
+      .. versionadded:: 3.8
+
    .. method:: close()
 
       Close the watcher.
@@ -191,16 +202,48 @@ implementation used by the asyncio event loop:
       This method has to be called to ensure that underlying
       resources are cleaned-up.
 
-.. class:: SafeChildWatcher
+.. class:: ThreadedChildWatcher
+
+   This implementation starts a new waiting thread for every subprocess spawn.
+
+   It works reliably even when the asyncio event loop is run in a non-main OS thread.
+
+   There is no noticeable overhead when handling a big number of children (*O(1)* each
+   time a child terminates), but stating a thread per process requires extra memory.
+
+   This watcher is used by default.
+
+   .. versionadded:: 3.8
 
-   This implementation avoids disrupting other code spawning processes
+.. class:: MultiLoopChildWatcher
+
+   This implementation registers a :py:data:`SIGCHLD` signal handler on
+   instantiation. That can break third-party code that installs a custom handler for
+   `SIGCHLD`.  signal).
+
+   The watcher avoids disrupting other code spawning processes
    by polling every process explicitly on a :py:data:`SIGCHLD` signal.
 
-   This is a safe solution but it has a significant overhead when
+   There is no limitation for running subprocesses from different threads once the
+   watcher is installed.
+
+   The solution is safe but it has a significant overhead when
    handling a big number of processes (*O(n)* each time a
    :py:data:`SIGCHLD` is received).
 
-   asyncio uses this safe implementation by default.
+   .. versionadded:: 3.8
+
+.. class:: SafeChildWatcher
+
+   This implementation uses active event loop from the main thread to handle
+   :py:data:`SIGCHLD` signal. If the main thread has no running event loop another
+   thread cannot spawn a subprocess (:exc:`RuntimeError` is raised).
+
+   The watcher avoids disrupting other code spawning processes
+   by polling every process explicitly on a :py:data:`SIGCHLD` signal.
+
+   This solution is as safe as :class:`MultiLoopChildWatcher` and has the same *O(N)*
+   complexity but requires a running event loop in the main thread to work.
 
 .. class:: FastChildWatcher
 
@@ -211,6 +254,9 @@ implementation used by the asyncio event loop:
    There is no noticeable overhead when handling a big number of
    children (*O(1)* each time a child terminates).
 
+   This solution requires a running event loop in the main thread to work, as
+   :class:`SafeChildWatcher`.
+
 
 Custom Policies
 ===============
diff --git a/Doc/library/asyncio-subprocess.rst b/Doc/library/asyncio-subprocess.rst
index 00dc66c48b21..444fb6361b5e 100644
--- a/Doc/library/asyncio-subprocess.rst
+++ b/Doc/library/asyncio-subprocess.rst
@@ -293,18 +293,26 @@ their completion.
 Subprocess and Threads
 ----------------------
 
-Standard asyncio event loop supports running subprocesses from
-different threads, but there are limitations:
+Standard asyncio event loop supports running subprocesses from different threads by
+default.
 
-* An event loop must run in the main thread.
+On Windows subprocesses are provided by :class:`ProactorEventLoop` only (default),
+:class:`SelectorEventLoop` has no subprocess support.
 
-* The child watcher must be instantiated in the main thread
-  before executing subprocesses from other threads. Call the
-  :func:`get_child_watcher` function in the main thread to instantiate
-  the child watcher.
+On UNIX *child watchers* are used for subprocess finish waiting, see
+:ref:`asyncio-watchers` for more info.
 
-Note that alternative event loop implementations might not share
-the above limitations; please refer to their documentation.
+
+.. versionchanged:: 3.8
+
+   UNIX switched to use :class:`ThreadedChildWatcher` for spawning subprocesses from
+   different threads without any limitation.
+
+   Spawning a subprocess with *inactive* current child watcher raises
+   :exc:`RuntimeError`.
+
+Note that alternative event loop implementations might have own limitations;
+please refer to their documentation.
 
 .. seealso::
 
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 28128d2977df..d7a4af86f71b 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -2,6 +2,7 @@
 
 import errno
 import io
+import itertools
 import os
 import selectors
 import signal
@@ -12,7 +13,6 @@
 import threading
 import warnings
 
-
 from . import base_events
 from . import base_subprocess
 from . import constants
@@ -29,7 +29,9 @@
 __all__ = (
     'SelectorEventLoop',
     'AbstractChildWatcher', 'SafeChildWatcher',
-    'FastChildWatcher', 'DefaultEventLoopPolicy',
+    'FastChildWatcher',
+    'MultiLoopChildWatcher', 'ThreadedChildWatcher',
+    'DefaultEventLoopPolicy',
 )
 
 
@@ -184,6 +186,13 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
                                          stdin, stdout, stderr, bufsize,
                                          extra=None, **kwargs):
         with events.get_child_watcher() as watcher:
+            if not watcher.is_active():
+                # Check early.
+                # Raising exception before process creation
+                # prevents subprocess execution if the watcher
+                # is not ready to handle it.
+                raise RuntimeError("asyncio.get_child_watcher() is not activated, "
+                                   "subprocess support is not installed.")
             waiter = self.create_future()
             transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                               stdin, stdout, stderr, bufsize,
@@ -838,6 +847,15 @@ def close(self):
         """
         raise NotImplementedError()
 
+    def is_active(self):
+        """Return ``True`` if the watcher is active and is used by the event loop.
+
+        Return True if the watcher is installed and ready to handle process exit
+        notifications.
+
+        """
+        raise NotImplementedError()
+
     def __enter__(self):
         """Enter the watcher's context and allow starting new processes
 
@@ -849,6 +867,20 @@ def __exit__(self, a, b, c):
         raise NotImplementedError()
 
 
+def _compute_returncode(status):
+    if os.WIFSIGNALED(status):
+        # The child process died because of a signal.
+        return -os.WTERMSIG(status)
+    elif os.WIFEXITED(status):
+        # The child process exited (e.g sys.exit()).
+        return os.WEXITSTATUS(status)
+    else:
+        # The child exited, but we don't understand its status.
+        # This shouldn't happen, but if it does, let's just
+        # return that status; perhaps that helps debug it.
+        return status
+
+
 class BaseChildWatcher(AbstractChildWatcher):
 
     def __init__(self):
@@ -858,6 +890,9 @@ def __init__(self):
     def close(self):
         self.attach_loop(None)
 
+    def is_active(self):
+        return self._loop is not None and self._loop.is_running()
+
     def _do_waitpid(self, expected_pid):
         raise NotImplementedError()
 
@@ -898,19 +933,6 @@ def _sig_chld(self):
                 'exception': exc,
             })
 
-    def _compute_returncode(self, status):
-        if os.WIFSIGNALED(status):
-            # The child process died because of a signal.
-            return -os.WTERMSIG(status)
-        elif os.WIFEXITED(status):
-            # The child process exited (e.g sys.exit()).
-            return os.WEXITSTATUS(status)
-        else:
-            # The child exited, but we don't understand its status.
-            # This shouldn't happen, but if it does, let's just
-            # return that status; perhaps that helps debug it.
-            return status
-
 
 class SafeChildWatcher(BaseChildWatcher):
     """'Safe' child watcher implementation.
@@ -934,11 +956,6 @@ def __exit__(self, a, b, c):
         pass
 
     def add_child_handler(self, pid, callback, *args):
-        if self._loop is None:
-            raise RuntimeError(
-                "Cannot add child handler, "
-                "the child watcher does not have a loop attached")
-
         self._callbacks[pid] = (callback, args)
 
         # Prevent a race condition in case the child is already terminated.
@@ -974,7 +991,7 @@ def _do_waitpid(self, expected_pid):
                 # The child process is still alive.
                 return
 
-            returncode = self._compute_returncode(status)
+            returncode = _compute_returncode(status)
             if self._loop.get_debug():
                 logger.debug('process %s exited with returncode %s',
                              expected_pid, returncode)
@@ -1035,11 +1052,6 @@ def __exit__(self, a, b, c):
     def add_child_handler(self, pid, callback, *args):
         assert self._forks, "Must use the context manager"
 
-        if self._loop is None:
-            raise RuntimeError(
-                "Cannot add child handler, "
-                "the child watcher does not have a loop attached")
-
         with self._lock:
             try:
                 returncode = self._zombies.pop(pid)
@@ -1072,7 +1084,7 @@ def _do_waitpid_all(self):
                     # A child process is still alive.
                     return
 
-                returncode = self._compute_returncode(status)
+                returncode = _compute_returncode(status)
 
             with self._lock:
                 try:
@@ -1101,6 +1113,209 @@ def _do_waitpid_all(self):
                 callback(pid, returncode, *args)
 
 
+class MultiLoopChildWatcher(AbstractChildWatcher):
+    """A watcher that doesn't require running loop in the main thread.
+
+    This implementation registers a SIGCHLD signal handler on
+    instantiation (which may conflict with other code that
+    install own handler for this signal).
+
+    The solution is safe but it has a significant overhead when
+    handling a big number of processes (*O(n)* each time a
+    SIGCHLD is received).
+    """
+
+    # Implementation note:
+    # The class keeps compatibility with AbstractChildWatcher ABC
+    # To achieve this it has empty attach_loop() method
+    # and doesn't accept explicit loop argument
+    # for add_child_handler()/remove_child_handler()
+    # but retrieves the current loop by get_running_loop()
+
+    def __init__(self):
+        self._callbacks = {}
+        self._saved_sighandler = None
+
+    def is_active(self):
+        return self._saved_sighandler is not None
+
+    def close(self):
+        self._callbacks.clear()
+        if self._saved_sighandler is not None:
+            handler = signal.getsignal(signal.SIGCHLD)
+            if handler != self._sig_chld:
+                logger.warning("SIGCHLD handler was changed by outside code")
+            else:
+                signal.signal(signal.SIGCHLD, self._saved_sighandler)
+            self._saved_sighandler = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        pass
+
+    def add_child_handler(self, pid, callback, *args):
+        loop = events.get_running_loop()
+        self._callbacks[pid] = (loop, callback, args)
+
+        # Prevent a race condition in case the child is already terminated.
+        self._do_waitpid(pid)
+
+    def remove_child_handler(self, pid):
+        try:
+            del self._callbacks[pid]
+            return True
+        except KeyError:
+            return False
+
+    def attach_loop(self, loop):
+        # Don't save the loop but initialize itself if called first time
+        # The reason to do it here is that attach_loop() is called from
+        # unix policy only for the main thread.
+        # Main thread is required for subscription on SIGCHLD signal
+        if self._saved_sighandler is None:
+            self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
+            if self._saved_sighandler is None:
+                logger.warning("Previous SIGCHLD handler was set by non-Python code, "
+                               "restore to default handler on watcher close.")
+                self._saved_sighandler = signal.SIG_DFL
+
+            # Set SA_RESTART to limit EINTR occurrences.
+            signal.siginterrupt(signal.SIGCHLD, False)
+
+    def _do_waitpid_all(self):
+        for pid in list(self._callbacks):
+            self._do_waitpid(pid)
+
+    def _do_waitpid(self, expected_pid):
+        assert expected_pid > 0
+
+        try:
+            pid, status = os.waitpid(expected_pid, os.WNOHANG)
+        except ChildProcessError:
+            # The child process is already reaped
+            # (may happen if waitpid() is called elsewhere).
+            pid = expected_pid
+            returncode = 255
+            logger.warning(
+                "Unknown child process pid %d, will report returncode 255",
+                pid)
+            debug_log = False
+        else:
+            if pid == 0:
+                # The child process is still alive.
+                return
+
+            returncode = _compute_returncode(status)
+            debug_log = True
+        try:
+            loop, callback, args = self._callbacks.pop(pid)
+        except KeyError:  # pragma: no cover
+            # May happen if .remove_child_handler() is called
+            # after os.waitpid() returns.
+            logger.warning("Child watcher got an unexpected pid: %r",
+                           pid, exc_info=True)
+        else:
+            if loop.is_closed():
+                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
+            else:
+                if debug_log and loop.get_debug():
+                    logger.debug('process %s exited with returncode %s',
+                                 expected_pid, returncode)
+                loop.call_soon_threadsafe(callback, pid, returncode, *args)
+
+    def _sig_chld(self, signum, frame):
+        try:
+            self._do_waitpid_all()
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except BaseException:
+            logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
+
+
+class ThreadedChildWatcher(AbstractChildWatcher):
+    """Threaded child watcher implementation.
+
+    The watcher uses a thread per process
+    for waiting for the process finish.
+
+    It doesn't require subscription on POSIX signal
+    but a thread creation is not free.
+
+    The watcher has O(1) complexity, its perfomance doesn't depend
+    on amount of spawn processes.
+    """
+
+    def __init__(self):
+        self._pid_counter = itertools.count(0)
+        self._threads = {}
+
+    def is_active(self):
+        return True
+
+    def close(self):
+        pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        pass
+
+    def __del__(self, _warn=warnings.warn):
+        threads = [thread for thread in list(self._threads.values())
+                   if thread.is_alive()]
+        if threads:
+            _warn(f"{self.__class__} has registered but not finished child processes",
+                  ResourceWarning,
+                  source=self)
+
+    def add_child_handler(self, pid, callback, *args):
+        loop = events.get_running_loop()
+        thread = threading.Thread(target=self._do_waitpid,
+                                  name=f"waitpid-{next(self._pid_counter)}",
+                                  args=(loop, pid, callback, args),
+                                  daemon=True)
+        self._threads[pid] = thread
+        thread.start()
+
+    def remove_child_handler(self, pid):
+        # asyncio never calls remove_child_handler() !!!
+        # The method is no-op but is implemented because
+        # abstract base classe requires it
+        return True
+
+    def attach_loop(self, loop):
+        pass
+
+    def _do_waitpid(self, loop, expected_pid, callback, args):
+        assert expected_pid > 0
+
+        try:
+            pid, status = os.waitpid(expected_pid, 0)
+        except ChildProcessError:
+            # The child process is already reaped
+            # (may happen if waitpid() is called elsewhere).
+            pid = expected_pid
+            returncode = 255
+            logger.warning(
+                "Unknown child process pid %d, will report returncode 255",
+                pid)
+        else:
+            returncode = _compute_returncode(status)
+            if loop.get_debug():
+                logger.debug('process %s exited with returncode %s',
+                             expected_pid, returncode)
+
+        if loop.is_closed():
+            logger.warning("Loop %r that handles pid %r is closed", loop, pid)
+        else:
+            loop.call_soon_threadsafe(callback, pid, returncode, *args)
+
+        self._threads.pop(expected_pid)
+
+
 class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
     """UNIX event loop policy with a watcher for child processes."""
     _loop_factory = _UnixSelectorEventLoop
@@ -1112,7 +1327,7 @@ def __init__(self):
     def _init_watcher(self):
         with events._lock:
             if self._watcher is None:  # pragma: no branch
-                self._watcher = SafeChildWatcher()
+                self._watcher = ThreadedChildWatcher()
                 if isinstance(threading.current_thread(),
                               threading._MainThread):
                     self._watcher.attach_loop(self._local._loop)
@@ -1134,7 +1349,7 @@ def set_event_loop(self, loop):
     def get_child_watcher(self):
         """Get the watcher for child processes.
 
-        If not yet set, a SafeChildWatcher object is automatically created.
+        If not yet set, a ThreadedChildWatcher object is automatically created.
         """
         if self._watcher is None:
             self._init_watcher()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index e9a9e50430c3..b9578b2866c0 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -633,6 +633,7 @@ def test_create_subprocess_exec_with_path(self):
 
         self.assertIsNone(self.loop.run_until_complete(execute()))
 
+
 if sys.platform != 'win32':
     # Unix
     class SubprocessWatcherMixin(SubprocessMixin):
@@ -648,7 +649,24 @@ def setUp(self):
             watcher = self.Watcher()
             watcher.attach_loop(self.loop)
             policy.set_child_watcher(watcher)
-            self.addCleanup(policy.set_child_watcher, None)
+
+        def tearDown(self):
+            super().tearDown()
+            policy = asyncio.get_event_loop_policy()
+            watcher = policy.get_child_watcher()
+            policy.set_child_watcher(None)
+            watcher.attach_loop(None)
+            watcher.close()
+
+    class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
+                                         test_utils.TestCase):
+
+        Watcher = unix_events.ThreadedChildWatcher
+
+    class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
+                                          test_utils.TestCase):
+
+        Watcher = unix_events.MultiLoopChildWatcher
 
     class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
                                      test_utils.TestCase):
@@ -670,5 +688,25 @@ def setUp(self):
             self.set_event_loop(self.loop)
 
 
+class GenericWatcherTests:
+
+    def test_create_subprocess_fails_with_inactive_watcher(self):
+
+        async def execute():
+            watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
+            watcher.is_active.return_value = False
+            asyncio.set_child_watcher(watcher)
+
+            with self.assertRaises(RuntimeError):
+                await subprocess.create_subprocess_exec(
+                    support.FakePath(sys.executable), '-c', 'pass')
+
+            watcher.add_child_handler.assert_not_called()
+
+        self.assertIsNone(self.loop.run_until_complete(execute()))
+
+
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 5c610cdd67ba..462a8b3c7859 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1082,6 +1082,8 @@ def test_not_implemented(self):
             NotImplementedError, watcher.attach_loop, f)
         self.assertRaises(
             NotImplementedError, watcher.close)
+        self.assertRaises(
+            NotImplementedError, watcher.is_active)
         self.assertRaises(
             NotImplementedError, watcher.__enter__)
         self.assertRaises(
@@ -1784,15 +1786,6 @@ def test_close(self, m):
                 if isinstance(self.watcher, asyncio.FastChildWatcher):
                     self.assertFalse(self.watcher._zombies)
 
-    @waitpid_mocks
-    def test_add_child_handler_with_no_loop_attached(self, m):
-        callback = mock.Mock()
-        with self.create_watcher() as watcher:
-            with self.assertRaisesRegex(
-                    RuntimeError,
-                    'the child watcher does not have a loop attached'):
-                watcher.add_child_handler(100, callback)
-
 
 class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
     def create_watcher(self):
@@ -1809,17 +1802,16 @@ class PolicyTests(unittest.TestCase):
     def create_policy(self):
         return asyncio.DefaultEventLoopPolicy()
 
-    def test_get_child_watcher(self):
+    def test_get_default_child_watcher(self):
         policy = self.create_policy()
         self.assertIsNone(policy._watcher)
 
         watcher = policy.get_child_watcher()
-        self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
+        self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
 
         self.assertIs(policy._watcher, watcher)
 
         self.assertIs(watcher, policy.get_child_watcher())
-        self.assertIsNone(watcher._loop)
 
     def test_get_child_watcher_after_set(self):
         policy = self.create_policy()
@@ -1829,18 +1821,6 @@ def test_get_child_watcher_after_set(self):
         self.assertIs(policy._watcher, watcher)
         self.assertIs(watcher, policy.get_child_watcher())
 
-    def test_get_child_watcher_with_mainloop_existing(self):
-        policy = self.create_policy()
-        loop = policy.get_event_loop()
-
-        self.assertIsNone(policy._watcher)
-        watcher = policy.get_child_watcher()
-
-        self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
-        self.assertIs(watcher._loop, loop)
-
-        loop.close()
-
     def test_get_child_watcher_thread(self):
 
         def f():
@@ -1866,7 +1846,11 @@ def test_child_watcher_replace_mainloop_existing(self):
         policy = self.create_policy()
         loop = policy.get_event_loop()
 
-        watcher = policy.get_child_watcher()
+        # Explicitly setup SafeChildWatcher,
+        # default ThreadedChildWatcher has no _loop property
+        watcher = asyncio.SafeChildWatcher()
+        policy.set_child_watcher(watcher)
+        watcher.attach_loop(loop)
 
         self.assertIs(watcher._loop, loop)
 
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index cb373d544f41..5b4bb123a9ec 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -1,5 +1,6 @@
 """Utilities shared by tests."""
 
+import asyncio
 import collections
 import contextlib
 import io
@@ -512,6 +513,18 @@ def close_loop(loop):
         if executor is not None:
             executor.shutdown(wait=True)
         loop.close()
+        policy = support.maybe_get_event_loop_policy()
+        if policy is not None:
+            try:
+                watcher = policy.get_child_watcher()
+            except NotImplementedError:
+                # watcher is not implemented by EventLoopPolicy, e.g. Windows
+                pass
+            else:
+                if isinstance(watcher, asyncio.ThreadedChildWatcher):
+                    threads = list(watcher._threads.values())
+                    for thread in threads:
+                        thread.join()
 
     def set_event_loop(self, loop, *, cleanup=True):
         assert loop is not None
diff --git a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
new file mode 100644
index 000000000000..c492e1de6d5c
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
@@ -0,0 +1,2 @@
+Support running asyncio subprocesses when execution event loop in a thread
+on UNIX.



More information about the Python-checkins mailing list