Better integration of multiprocessing with asyncio

I think it would be helpful for folks using the asyncio module to be able to make non-blocking calls to objects in the multiprocessing module more easily. While some use-cases for using multiprocessing can be replaced with ProcessPoolExecutor/run_in_executor, there are others that cannot; more advanced usages of multiprocessing.Pool aren't supported by ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other multiprocessing classes like Lock and Queue have blocking methods that could be made into coroutines. Consider this (extremely contrived, but use your imagination) example of a asyncio-friendly Queue: import asyncio import time def do_proc_work(q, val, val2): time.sleep(3) # Imagine this is some expensive CPU work. ok = val + val2 print("Passing {} to parent".format(ok)) q.put(ok) # The Queue can be used with the normal blocking API, too. item = q.get() print("got {} back from parent".format(item)) def do_some_async_io_task(): # Imagine there's some kind of asynchronous I/O # going on here that utilizes asyncio. asyncio.sleep(5) @asyncio.coroutine def do_work(q): loop.run_in_executor(ProcessPoolExecutor(), do_proc_work, q, 1, 2) do_some_async_io_task() item = yield from q.coro_get() # Non-blocking get that won't affect our io_task print("Got {} from worker".format(item)) item = item + 25 yield from q.coro_put(item) if __name__ == "__main__": q = AsyncProcessQueue() # This is our new asyncio-friendly version of multiprocessing.Queue loop = asyncio.get_event_loop() loop.run_until_complete(do_work(q)) I have seen some rumblings about a desire to do this kind of integration on the bug tracker (http://bugs.python.org/issue10037#msg162497 and http://bugs.python.org/issue9248#msg221963) though that discussion is specifically tied to merging the enhancements from the Billiard library into multiprocessing.Pool. Are there still plans to do that? If so, should asyncio integration with multiprocessing be rolled into those plans, or does it make sense to pursue it separately? Even more generally, do people think this kind of integration is a good idea to begin with? I know using asyncio is primarily about *avoiding* the headaches of concurrent threads/processes, but there are always going to be cases where CPU-intensive work is going to be required in a primarily I/O-bound application. The easier it is to for developers to handle those use-cases, the better, IMO. Note that the same sort of integration could be done with the threading module, though I think there's a fairly limited use-case for that; most times you'd want to use threads over processes, you could probably just use non-blocking I/O instead. Thanks, Dan

I actually know very little about multiprocessing (have never used it) but I imagine the way you normally interact with multiprocessing is using a synchronous calls that talk to the subprocesses and their work queues and so on, right? In the asyncio world you would put that work in a thread and then use run_in_executor() with a thread executor -- the thread would then be managing the subprocesses and talking to them. While you are waiting for that thread to complete your other coroutines will still work. Unless you want to rewrite the communication and process management as coroutines, but that sounds like a lot of work. On Sat, Jul 26, 2014 at 1:59 PM, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido)

Right, this is the same approach I've used myself. For example, the AsyncProcessQueue in my example above was implemented like this: def AsyncProcessQueue(maxsize=0): m = Manager() q = m.Queue(maxsize=maxsize) return _ProcQueue(q) class _ProcQueue(object): def __init__(self, q): self._queue = q self._executor = self._get_executor() self._cancelled_join = False def __getstate__(self): self_dict = self.__dict__ self_dict['_executor'] = None return self_dict def _get_executor(self): return ThreadPoolExecutor(max_workers=cpu_count()) def __setstate__(self, self_dict): self_dict['_executor'] = self._get_executor() self.__dict__.update(self_dict) def __getattr__(self, name): if name in ['qsize', 'empty', 'full', 'put', 'put_nowait', 'get', 'get_nowait', 'close']: return getattr(self._queue, name) else: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) @asyncio.coroutine def coro_put(self, item): loop = asyncio.get_event_loop() return (yield from loop.run_in_executor(self._executor, self.put, item)) @asyncio.coroutine def coro_get(self): loop = asyncio.get_event_loop() return (yield from loop.run_in_executor(self._executor, self.get)) def cancel_join_thread(self): self._cancelled_join = True self._queue.cancel_join_thread() def join_thread(self): self._queue.join_thread() if self._executor and not self._cancelled_join: self._executor.shutdown() I'm wondering if a complete library providing this kind of behavior for all or some subset of multiprocessing is worth adding to the the asyncio module, or if you prefer users to deal with this on their own (or perhaps just distribute something that provides this behavior as a stand-alone library). I suppose adding asyncio-friendly methods to the existing objects in multiprocessing is also an option, but I doubt its desirable to add asyncio-specific code to modules other than asyncio. It also sort of sounds like some of the work that's gone on in Billiard would make the alternative, more complicated approach you mentioned a realistic possibility, at least going by this comment by Ask Solem (from http://bugs.python.org/issue9248#msg221963):
we have a version of multiprocessing.Pool using async IO and one pipe per process that drastically improves performance and also avoids the threads+forking issues (well, not the initial fork), but I have not yet adapted it to use the new asyncio module in 3.4.
I don't know the details there, though. Hopefully someone more familiar with Billiard/multiprocessing than I am can provide some additional information. On Sat, Jul 26, 2014 at 10:39 PM, Guido van Rossum <guido@python.org> wrote:

I'm going to go out on a limb here and say that it feels too early to me. First someone has to actually solve this problem well as a 3rd party package before we can talk about adding it to the asyncio package. It doesn't actually sound like Billiards has adapted to asyncio yet (not that I have any idea what Billiards is -- it sounds like a fork of multiprocessing actually?). On Sat, Jul 26, 2014 at 8:34 PM, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido)

On Jul 26, 2014, at 10:43 PM, Guido van Rossum <guido@python.org> wrote:
I'm going to go out on a limb here and say that it feels too early to me. First someone has to actually solve this problem well as a 3rd party package before we can talk about adding it to the asyncio package. It doesn't actually sound like Billiards has adapted to asyncio yet (not that I have any idea what Billiards is -- it sounds like a fork of multiprocessing actually?).
Yep, Billiard is a fork of multiprocessing: https://pypi.python.org/pypi/billiard

For what it's worth, I ended up writing a module that takes the "threads on top of regular multiprocessing" approach to integrate the two: https://github.com/dano/aioprocessing. Re-implementing multiprocessing to use asyncio internally, while an interesting exercise, would require a very large amount of effort both to implement and maintain alongside the current multiprocessing module. I'm not sure it's really worth it when using threads on top of multiprocessing gives you the same effect without requiring you to basically re-implement large parts of the multiprocessing module. Anyway, we'll see if it ends up getting much use... On Sat, Jul 26, 2014 at 11:48 PM, Ryan Hiebert <ryan@ryanhiebert.com> wrote:

This looks neat. I skimmed the README, and I noticed you marked up most "yield from" epressions with "# non blocking". That feels confusing to me, because when I read asyncio code, I think fo "yield from" as blocking (the task, if not the world :-). What do you think? --Guido On Tuesday, October 7, 2014, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (on iPad)

Ah, right. I find the terminology confusing myself, and I probably got it wrong. I meant "non-blocking" as in - "this won't block the event loop", not "this won't block the coroutine". I'll try to clarify that. Thanks! On Thu, Oct 9, 2014 at 12:03 AM, Guido van Rossum <guido@python.org> wrote:

It takes time to get used to it, but eventually you don't have to think about the event loop. There really is no need for any comment of that kind at all. The beauty is that *nothing* blocks the event loop... On Wednesday, October 8, 2014, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (on iPad)

Right - I was just calling it out in the example to highlight that the aioprocessing calls wouldn't block the event loop, as opposed to the equivalent multiprocessing calls, which would. Though I suppose that should be obvious from the use of "yield from". I'll just remove the comments from the example and add a sentence or two afterwards that clarifies the event loop is never blocked. On Thu, Oct 9, 2014 at 12:35 AM, Guido van Rossum <guido@python.org> wrote:

On 27 July 2014 13:34, Dan O'Reilly <oreilldf@gmail.com> wrote:
Actually, having asyncio act as a "nexus" for asynchronous IO backends is one of the reasons for its existence. The asyncio event loop is pluggable, so making multiprocessing asyncio friendly (whether directly, or as an addon library that bridges the two) *also* has the effect of making it compatible with all the other asynchronous event loops that can be plugged into the asyncio framework. I'm inclined to agree with Guido, though - while I think making asyncio and multiprocessing play well together is a good idea in principle, I think we're still in the "third party exploration phase" of that integration. Once folks figure out good ways to do it, *then* we can start talking about making that integration a default part of Python 3.5 or 3.6+. Cheers, Nick. -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia

I actually know very little about multiprocessing (have never used it) but I imagine the way you normally interact with multiprocessing is using a synchronous calls that talk to the subprocesses and their work queues and so on, right? In the asyncio world you would put that work in a thread and then use run_in_executor() with a thread executor -- the thread would then be managing the subprocesses and talking to them. While you are waiting for that thread to complete your other coroutines will still work. Unless you want to rewrite the communication and process management as coroutines, but that sounds like a lot of work. On Sat, Jul 26, 2014 at 1:59 PM, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido)

Right, this is the same approach I've used myself. For example, the AsyncProcessQueue in my example above was implemented like this: def AsyncProcessQueue(maxsize=0): m = Manager() q = m.Queue(maxsize=maxsize) return _ProcQueue(q) class _ProcQueue(object): def __init__(self, q): self._queue = q self._executor = self._get_executor() self._cancelled_join = False def __getstate__(self): self_dict = self.__dict__ self_dict['_executor'] = None return self_dict def _get_executor(self): return ThreadPoolExecutor(max_workers=cpu_count()) def __setstate__(self, self_dict): self_dict['_executor'] = self._get_executor() self.__dict__.update(self_dict) def __getattr__(self, name): if name in ['qsize', 'empty', 'full', 'put', 'put_nowait', 'get', 'get_nowait', 'close']: return getattr(self._queue, name) else: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) @asyncio.coroutine def coro_put(self, item): loop = asyncio.get_event_loop() return (yield from loop.run_in_executor(self._executor, self.put, item)) @asyncio.coroutine def coro_get(self): loop = asyncio.get_event_loop() return (yield from loop.run_in_executor(self._executor, self.get)) def cancel_join_thread(self): self._cancelled_join = True self._queue.cancel_join_thread() def join_thread(self): self._queue.join_thread() if self._executor and not self._cancelled_join: self._executor.shutdown() I'm wondering if a complete library providing this kind of behavior for all or some subset of multiprocessing is worth adding to the the asyncio module, or if you prefer users to deal with this on their own (or perhaps just distribute something that provides this behavior as a stand-alone library). I suppose adding asyncio-friendly methods to the existing objects in multiprocessing is also an option, but I doubt its desirable to add asyncio-specific code to modules other than asyncio. It also sort of sounds like some of the work that's gone on in Billiard would make the alternative, more complicated approach you mentioned a realistic possibility, at least going by this comment by Ask Solem (from http://bugs.python.org/issue9248#msg221963):
we have a version of multiprocessing.Pool using async IO and one pipe per process that drastically improves performance and also avoids the threads+forking issues (well, not the initial fork), but I have not yet adapted it to use the new asyncio module in 3.4.
I don't know the details there, though. Hopefully someone more familiar with Billiard/multiprocessing than I am can provide some additional information. On Sat, Jul 26, 2014 at 10:39 PM, Guido van Rossum <guido@python.org> wrote:

I'm going to go out on a limb here and say that it feels too early to me. First someone has to actually solve this problem well as a 3rd party package before we can talk about adding it to the asyncio package. It doesn't actually sound like Billiards has adapted to asyncio yet (not that I have any idea what Billiards is -- it sounds like a fork of multiprocessing actually?). On Sat, Jul 26, 2014 at 8:34 PM, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido)

On Jul 26, 2014, at 10:43 PM, Guido van Rossum <guido@python.org> wrote:
I'm going to go out on a limb here and say that it feels too early to me. First someone has to actually solve this problem well as a 3rd party package before we can talk about adding it to the asyncio package. It doesn't actually sound like Billiards has adapted to asyncio yet (not that I have any idea what Billiards is -- it sounds like a fork of multiprocessing actually?).
Yep, Billiard is a fork of multiprocessing: https://pypi.python.org/pypi/billiard

For what it's worth, I ended up writing a module that takes the "threads on top of regular multiprocessing" approach to integrate the two: https://github.com/dano/aioprocessing. Re-implementing multiprocessing to use asyncio internally, while an interesting exercise, would require a very large amount of effort both to implement and maintain alongside the current multiprocessing module. I'm not sure it's really worth it when using threads on top of multiprocessing gives you the same effect without requiring you to basically re-implement large parts of the multiprocessing module. Anyway, we'll see if it ends up getting much use... On Sat, Jul 26, 2014 at 11:48 PM, Ryan Hiebert <ryan@ryanhiebert.com> wrote:

This looks neat. I skimmed the README, and I noticed you marked up most "yield from" epressions with "# non blocking". That feels confusing to me, because when I read asyncio code, I think fo "yield from" as blocking (the task, if not the world :-). What do you think? --Guido On Tuesday, October 7, 2014, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (on iPad)

Ah, right. I find the terminology confusing myself, and I probably got it wrong. I meant "non-blocking" as in - "this won't block the event loop", not "this won't block the coroutine". I'll try to clarify that. Thanks! On Thu, Oct 9, 2014 at 12:03 AM, Guido van Rossum <guido@python.org> wrote:

It takes time to get used to it, but eventually you don't have to think about the event loop. There really is no need for any comment of that kind at all. The beauty is that *nothing* blocks the event loop... On Wednesday, October 8, 2014, Dan O'Reilly <oreilldf@gmail.com> wrote:
-- --Guido van Rossum (on iPad)

Right - I was just calling it out in the example to highlight that the aioprocessing calls wouldn't block the event loop, as opposed to the equivalent multiprocessing calls, which would. Though I suppose that should be obvious from the use of "yield from". I'll just remove the comments from the example and add a sentence or two afterwards that clarifies the event loop is never blocked. On Thu, Oct 9, 2014 at 12:35 AM, Guido van Rossum <guido@python.org> wrote:

On 27 July 2014 13:34, Dan O'Reilly <oreilldf@gmail.com> wrote:
Actually, having asyncio act as a "nexus" for asynchronous IO backends is one of the reasons for its existence. The asyncio event loop is pluggable, so making multiprocessing asyncio friendly (whether directly, or as an addon library that bridges the two) *also* has the effect of making it compatible with all the other asynchronous event loops that can be plugged into the asyncio framework. I'm inclined to agree with Guido, though - while I think making asyncio and multiprocessing play well together is a good idea in principle, I think we're still in the "third party exploration phase" of that integration. Once folks figure out good ways to do it, *then* we can start talking about making that integration a default part of Python 3.5 or 3.6+. Cheers, Nick. -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia
participants (4)
-
Dan O'Reilly
-
Guido van Rossum
-
Nick Coghlan
-
Ryan Hiebert