async ProcessPoolExecutor

Hi, While working on a parallel numerical quadrature routine that features speculative evaluation, I got interested in async programming in Python. Most async programming seems to be done for network applications, but it seems to me that controlling complex parallel computations could be also a promising application. The asyncio package has the run_in_executor function that seemed to be just what I need. When looking at how it works I noticed that dispatching tasks from an asyncio event loop to a concurrent.futures.ProcessPoolExecutor involves a lot of wrapping, locking and hence complexity and inefficiency. Shouldn't it be possible to write an async version of ProcessPoolExecutor? I did just this and the results seem quite promising. Consider the following program: **************************************************************** from concurrent import futures import time def work(x): return x**2 def main(): executor = futures.ProcessPoolExecutor(48) now = time.time() fs = [executor.submit(work, x) for x in range(10000)] print(sum(f.result() for f in futures.as_completed(fs))) print(time.time() - now) if __name__ == "__main__": main() **************************************************************** It takes more than 5 seconds on my machine. It becomes even worse (7 seconds) when asyncio with run_in_executor is used. Using the async executor, the following variant of the above script runs in 2 seconds: **************************************************************** import asyncio import time import aexecutor def work(x): return x**2 async def main_coro(): async with aexecutor.ProcessPoolExecutor(48) as executor: now = time.time() fs = [executor.submit(work, x) for x in range(10000)] acc = 0 for f in asyncio.as_completed(fs): acc += await f print(acc) print(time.time() - now) def main(): loop = asyncio.get_event_loop() loop.run_until_complete(main_coro()) if __name__ == "__main__": main() **************************************************************** That difference becomes less when the size of the worker pool gets reduced, but it's significant even for a few workers. And I am actually interested in controlling a huge number of workers (I do computations on a cluster with 20 nodes of 48 cores each). What's more, the source code of the executor got reduced from around 1100 lines to 600. Please see the attachment for the source of the "aexecutor" package as well as the three test programs. (All to be run with Python 3.5.) I expect that the async executor will be useful for me. Not just for the gain in efficiency, but also for having a simple executor to play with. (I think I that I will need futures with a priority and I might also want to write a MPI version of it.) Perhaps this is something that could be interesting for the wider python community? Bear in mind that this is just a quick hack and my first project that uses asyncio. It's most likely buggy and non-portable (it does work on Unix). It can be probably simplified further and there must be a way to improve the inner loop of the scheduler coroutine which currently looks like this: **************************************************************** reader = result_queue._reader event = asyncio.Event(loop=loop) loop.add_reader(reader._handle, event.set) while True: (...) while True: await event.wait() if reader.poll(0): break else: event.clear() result_item = reader.recv() event.clear() (...) **************************************************************** Any ideas? Christoph

My first post only hints on how "executor" works. Here is some more information: concurrent.futures.ProcessPoolExecutor uses several queues to keep track of the pending work. There's one (potentially long) queue.Queue instance that holds the work items that have been submitted. And there are two (short) queues from multiprocessing that are shared among the master process and the worker processes: one for dispatching work, and one for receiving results. These queues are managed by a routine that runs in a separate thread of the master process. Because of the separate manager thread the Futures from concurrent.futures need to be thread-safe. The "executor" package began as a copy of concurrent.features. Subsequently, I replaced bits by equivalents from asyncio as far as possible. This necessitated also some changes to how it works, but the interface remained mostly unchanged. The specific changes are most clearly visible by diffing against process.py from concurrent.futures. The resulting package is free of locks and threads, except as used internally by the multiprocessing.Queue instances.
participants (1)
-
Christoph Groth