
On 06/30/2015 12:08 AM, Nick Coghlan wrote:
On 30 June 2015 at 07:51, Ron Adam <ron3200@gmail.com> wrote:
On 06/29/2015 07:23 AM, Nick Coghlan wrote:
Some completely untested conceptual code that may not even compile, let alone run, but hopefully conveys what I mean better than English does:
It seems (to me) like there are more layers here than needed. I suppose since this is a higher order functionality, it may be the nature of it. <shrug>
def get_awaitables(self, async_iterable) """Gets a list of awaitables from an asynchronous iterator""" asynciter = async_iterable.__aiter__() awaitables = [] while True: try: awaitables.append(asynciter.__anext__()) except StopAsyncIteration: break return awaitables
async def wait_for_result(awaitable): """Simple coroutine to wait for a single result""" return await awaitable
def iter_coroutines(async_iterable): """Produces coroutines to wait for each result from an asynchronous iterator""" for awaitable in get_awaitables(async_iterable): yield wait_for_result(awaitable)
def iter_tasks(async_iterable, eventloop=None): """Schedules event loop tasks to wait for each result from an asynchronous iterator""" if eventloop is None: eventloop = asyncio.get_event_loop() for coroutine in iter_coroutines(async_iterable): yield eventloop.create_task(coroutine)
class aiter_parallel: """Asynchronous iterator to wait for several asynchronous operations in parallel""" def __init__(self, async_iterable): # Concurrent evaluation of future results is launched immediately self._tasks = tasks = list(iter_tasks(async_iterable)) self._taskiter = iter(tasks) def __aiter__(self): return self def __anext__(self): try: return next(self._taskiter) except StopIteration: raise StopAsyncIteration
# Example reduction function async def sum_async(async_iterable, start=0): tally = start async for x in aiter_parallel(async_iterable): tally += x return x
# Parallel sum from synchronous code: result = asyncio.get_event_loop().run_until_complete(sum_async(async_iterable))
# Parallel sum from asynchronous code: result = await sum_async(async_iterable))
As the definition of "aiter_parallel" shows, we don't offer any nice syntactic sugar for defining asynchronous iterators yet (hence the question that started this thread). Hopefully the above helps illustrate the complexity hidden behind such a deceptively simple question :)
While browsing the asyncio module, I decided to take a look at the multiprocessing module... from multiprocessing import Pool def async_map(fn, args): with Pool(processes=4) as pool: yield from pool.starmap(fn, args) def add(a, b): return a + b values = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)] print(sum(async_map(add, values))) #---> 55 That's really very nice. Are there advantages to asyncio over the multiprocessing module? Cheers, Ron