[Python-checkins] bpo-32309: Implement asyncio.to_thread() (GH-20143)

Kyle Stanley webhook-mailer at python.org
Mon May 18 23:03:36 EDT 2020


https://github.com/python/cpython/commit/cc2bbc2227c3f5ed9d8f6b3bd052e6f9e68279d2
commit: cc2bbc2227c3f5ed9d8f6b3bd052e6f9e68279d2
branch: master
author: Kyle Stanley <aeros167 at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-05-18T20:03:28-07:00
summary:

bpo-32309: Implement asyncio.to_thread() (GH-20143)



Implements `asyncio.to_thread`, a coroutine for asynchronously running IO-bound functions in a separate thread without blocking the event loop. See the discussion starting from [here](https://github.com/python/cpython/pull/18410#issuecomment-628930973) in GH-18410 for context.

Automerge-Triggered-By: @aeros

files:
A Lib/asyncio/threads.py
A Lib/test/test_asyncio/test_threads.py
A Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst
M Doc/library/asyncio-api-index.rst
M Doc/library/asyncio-task.rst
M Doc/whatsnew/3.9.rst
M Lib/asyncio/__init__.py

diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst
index d5b5659abc65e..047e5bbc58cca 100644
--- a/Doc/library/asyncio-api-index.rst
+++ b/Doc/library/asyncio-api-index.rst
@@ -48,6 +48,9 @@ await on multiple things with timeouts.
     * - :class:`Task`
       - Task object.
 
+    * - :func:`to_thread`
+      - Asychronously run a function in a separate OS thread.
+
     * - :func:`run_coroutine_threadsafe`
       - Schedule a coroutine from another OS thread.
 
diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index 2e963398d9300..7c2704090551b 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -602,6 +602,62 @@ Waiting Primitives
            # ...
 
 
+Running in Threads
+==================
+
+.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)
+
+   Asynchronously run function *func* in a separate thread.
+
+   Any \*args and \*\*kwargs supplied for this function are directly passed
+   to *func*.
+
+   Return an :class:`asyncio.Future` which represents the eventual result of
+   *func*.
+
+   This coroutine function is primarily intended to be used for executing
+   IO-bound functions/methods that would otherwise block the event loop if
+   they were ran in the main thread. For example::
+
+       def blocking_io():
+           print(f"start blocking_io at {time.strftime('%X')}")
+           # Note that time.sleep() can be replaced with any blocking
+           # IO-bound operation, such as file operations.
+           time.sleep(1)
+           print(f"blocking_io complete at {time.strftime('%X')}")
+
+       async def main():
+           print(f"started main at {time.strftime('%X')}")
+
+           await asyncio.gather(
+               asyncio.to_thread(blocking_io),
+               asyncio.sleep(1))
+
+           print(f"finished main at {time.strftime('%X')}")
+
+
+       asyncio.run(main())
+
+       # Expected output:
+       #
+       # started main at 19:50:53
+       # start blocking_io at 19:50:53
+       # blocking_io complete at 19:50:54
+       # finished main at 19:50:54
+
+   Directly calling `blocking_io()` in any coroutine would block the event loop
+   for its duration, resulting in an additional 1 second of run time. Instead,
+   by using `asyncio.to_thread()`, we can run it in a separate thread without
+   blocking the event loop.
+
+   .. note::
+
+      Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
+      to make IO-bound functions non-blocking. However, for extension modules
+      that release the GIL or alternative Python implementations that don't
+      have one, `asyncio.to_thread()` can also be used for CPU-bound functions.
+
+
 Scheduling From Other Threads
 =============================
 
diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst
index 593f523828703..037e1055c79e5 100644
--- a/Doc/whatsnew/3.9.rst
+++ b/Doc/whatsnew/3.9.rst
@@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
 Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
 implementation that polls process file descriptors. (:issue:`38692`)
 
+Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
+running IO-bound functions in a separate thread to avoid blocking the event
+loop, and essentially works as a high-level version of
+:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
+(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)
+
 compileall
 ----------
 
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
index 28c2e2c429f34..eb84bfb189ccf 100644
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -17,6 +17,7 @@
 from .streams import *
 from .subprocess import *
 from .tasks import *
+from .threads import *
 from .transports import *
 
 # Exposed for _asynciomodule.c to implement now deprecated
@@ -35,6 +36,7 @@
            streams.__all__ +
            subprocess.__all__ +
            tasks.__all__ +
+           threads.__all__ +
            transports.__all__)
 
 if sys.platform == 'win32':  # pragma: no cover
diff --git a/Lib/asyncio/threads.py b/Lib/asyncio/threads.py
new file mode 100644
index 0000000000000..2f40467fe5bc7
--- /dev/null
+++ b/Lib/asyncio/threads.py
@@ -0,0 +1,21 @@
+"""High-level support for working with threads in asyncio"""
+
+import functools
+
+from . import events
+
+
+__all__ = "to_thread",
+
+
+async def to_thread(func, /, *args, **kwargs):
+    """Asynchronously run function *func* in a separate thread.
+
+    Any *args and **kwargs supplied for this function are directly passed
+    to *func*.
+
+    Return an asyncio.Future which represents the eventual result of *func*.
+    """
+    loop = events.get_running_loop()
+    func_call = functools.partial(func, *args, **kwargs)
+    return await loop.run_in_executor(None, func_call)
diff --git a/Lib/test/test_asyncio/test_threads.py b/Lib/test/test_asyncio/test_threads.py
new file mode 100644
index 0000000000000..99a00f21832f3
--- /dev/null
+++ b/Lib/test/test_asyncio/test_threads.py
@@ -0,0 +1,79 @@
+"""Tests for asyncio/threads.py"""
+
+import asyncio
+import unittest
+
+from unittest import mock
+from test.test_asyncio import utils as test_utils
+
+
+def tearDownModule():
+    asyncio.set_event_loop_policy(None)
+
+
+class ToThreadTests(test_utils.TestCase):
+    def setUp(self):
+        super().setUp()
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
+
+    def tearDown(self):
+        self.loop.run_until_complete(
+            self.loop.shutdown_default_executor())
+        self.loop.close()
+        asyncio.set_event_loop(None)
+        self.loop = None
+        super().tearDown()
+
+    def test_to_thread(self):
+        async def main():
+            return await asyncio.to_thread(sum, [40, 2])
+
+        result = self.loop.run_until_complete(main())
+        self.assertEqual(result, 42)
+
+    def test_to_thread_exception(self):
+        def raise_runtime():
+            raise RuntimeError("test")
+
+        async def main():
+            await asyncio.to_thread(raise_runtime)
+
+        with self.assertRaisesRegex(RuntimeError, "test"):
+            self.loop.run_until_complete(main())
+
+    def test_to_thread_once(self):
+        func = mock.Mock()
+
+        async def main():
+            await asyncio.to_thread(func)
+
+        self.loop.run_until_complete(main())
+        func.assert_called_once()
+
+    def test_to_thread_concurrent(self):
+        func = mock.Mock()
+
+        async def main():
+            futs = []
+            for _ in range(10):
+                fut = asyncio.to_thread(func)
+                futs.append(fut)
+            await asyncio.gather(*futs)
+
+        self.loop.run_until_complete(main())
+        self.assertEqual(func.call_count, 10)
+
+    def test_to_thread_args_kwargs(self):
+        # Unlike run_in_executor(), to_thread() should directly accept kwargs.
+        func = mock.Mock()
+
+        async def main():
+            await asyncio.to_thread(func, 'test', something=True)
+
+        self.loop.run_until_complete(main())
+        func.assert_called_once_with('test', something=True)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst b/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst
new file mode 100644
index 0000000000000..6272c35edf4d5
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst
@@ -0,0 +1,4 @@
+Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
+running IO-bound functions in a separate thread to avoid blocking the event
+loop, and essentially works as a high-level version of
+:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
\ No newline at end of file



More information about the Python-checkins mailing list