[Python-ideas] async ProcessPoolExecutor
Christoph Groth
christoph at grothesque.org
Wed Jun 1 12:48:58 EDT 2016
Hi,
While working on a parallel numerical quadrature routine that features
speculative evaluation, I got interested in async programming in Python.
Most async programming seems to be done for network applications, but it
seems to me that controlling complex parallel computations could be also
a promising application.
The asyncio package has the run_in_executor function that seemed to be
just what I need. When looking at how it works I noticed that
dispatching tasks from an asyncio event loop to a
concurrent.futures.ProcessPoolExecutor involves a lot of wrapping,
locking and hence complexity and inefficiency.
Shouldn't it be possible to write an async version of
ProcessPoolExecutor?
I did just this and the results seem quite promising. Consider the
following program:
****************************************************************
from concurrent import futures
import time
def work(x):
return x**2
def main():
executor = futures.ProcessPoolExecutor(48)
now = time.time()
fs = [executor.submit(work, x) for x in range(10000)]
print(sum(f.result() for f in futures.as_completed(fs)))
print(time.time() - now)
if __name__ == "__main__":
main()
****************************************************************
It takes more than 5 seconds on my machine. It becomes even worse (7
seconds) when asyncio with run_in_executor is used.
Using the async executor, the following variant of the above script runs
in 2 seconds:
****************************************************************
import asyncio
import time
import aexecutor
def work(x):
return x**2
async def main_coro():
async with aexecutor.ProcessPoolExecutor(48) as executor:
now = time.time()
fs = [executor.submit(work, x) for x in range(10000)]
acc = 0
for f in asyncio.as_completed(fs):
acc += await f
print(acc)
print(time.time() - now)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(main_coro())
if __name__ == "__main__":
main()
****************************************************************
That difference becomes less when the size of the worker pool gets
reduced, but it's significant even for a few workers. And I am actually
interested in controlling a huge number of workers (I do computations on
a cluster with 20 nodes of 48 cores each).
What's more, the source code of the executor got reduced from around
1100 lines to 600.
Please see the attachment for the source of the "aexecutor" package as
well as the three test programs. (All to be run with Python 3.5.)
I expect that the async executor will be useful for me. Not just for
the gain in efficiency, but also for having a simple executor to play
with. (I think I that I will need futures with a priority and I might
also want to write a MPI version of it.) Perhaps this is something that
could be interesting for the wider python community?
Bear in mind that this is just a quick hack and my first project that
uses asyncio. It's most likely buggy and non-portable (it does work on
Unix). It can be probably simplified further and there must be a way to
improve the inner loop of the scheduler coroutine which currently looks
like this:
****************************************************************
reader = result_queue._reader
event = asyncio.Event(loop=loop)
loop.add_reader(reader._handle, event.set)
while True:
(...)
while True:
await event.wait()
if reader.poll(0):
break
else:
event.clear()
result_item = reader.recv()
event.clear()
(...)
****************************************************************
Any ideas?
Christoph
-------------- next part --------------
A non-text attachment was scrubbed...
Name: aexecutor-test.tar.gz
Type: application/gzip
Size: 6950 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20160601/4452f1a3/attachment.bin>
More information about the Python-ideas
mailing list