[Python-ideas] Are there asynchronous generators?
Nick Coghlan
ncoghlan at gmail.com
Tue Jun 30 06:08:19 CEST 2015
On 30 June 2015 at 07:51, Ron Adam <ron3200 at gmail.com> wrote:
>
> On 06/29/2015 07:23 AM, Nick Coghlan wrote:
>>
>>
>> On 29 Jun 2015 7:33 pm, "Guido van Rossum"
>> <guido at python.org
>> <mailto:guido at python.org>> wrote:
>> >
>> > Not following this in detail, but want to note that async isn't a good
>> model for parallelization (except I/O) because the expectation of
>> coroutines is single threading. The event loop serializes callbacks.
>> Changing this would break expectations and code.
>>
>> Yeah, it's a bad idea - I realised after reading your post that because
>> submission for scheduling and waiting for a result can already be
>> separated
>> it should be possible in Py 3.5 to write a "parallel" asynchronous
>> iterator
>> that eagerly consumes the awaitables produced by another asynchronous
>> iterator, schedules them all, then produces the awaitables in order.
>>
>> (That idea is probably as clear as mud without code to show what I
>> mean...)
>
>
> Only the parts concerning "schedules them all", and "produces awaitables in
> order". ;-)
Some completely untested conceptual code that may not even compile,
let alone run, but hopefully conveys what I mean better than English
does:
def get_awaitables(self, async_iterable):
"""Gets a list of awaitables from an asynchronous iterator"""
asynciter = async_iterable.__aiter__()
awaitables = []
while True:
try:
awaitables.append(asynciter.__anext__())
except StopAsyncIteration:
break
return awaitables
async def wait_for_result(awaitable):
"""Simple coroutine to wait for a single result"""
return await awaitable
def iter_coroutines(async_iterable):
"""Produces coroutines to wait for each result from an
asynchronous iterator"""
for awaitable in get_awaitables(async_iterable):
yield wait_for_result(awaitable)
def iter_tasks(async_iterable, eventloop=None):
"""Schedules event loop tasks to wait for each result from an
asynchronous iterator"""
if eventloop is None:
eventloop = asyncio.get_event_loop()
for coroutine in iter_coroutines(async_iterable):
yield eventloop.create_task(coroutine)
class aiter_parallel:
"""Asynchronous iterator to wait for several asynchronous
operations in parallel"""
def __init__(self, async_iterable):
# Concurrent evaluation of future results is launched immediately
self._tasks = tasks = list(iter_tasks(async_iterable))
self._taskiter = iter(tasks)
def __aiter__(self):
return self
def __anext__(self):
try:
return next(self._taskiter)
except StopIteration:
raise StopAsyncIteration
# Example reduction function
async def sum_async(async_iterable, start=0):
tally = start
async for x in aiter_parallel(async_iterable):
tally += x
return x
# Parallel sum from synchronous code:
result = asyncio.get_event_loop().run_until_complete(sum_async(async_iterable))
# Parallel sum from asynchronous code:
result = await sum_async(async_iterable))
As the definition of "aiter_parallel" shows, we don't offer any nice
syntactic sugar for defining asynchronous iterators yet (hence the
question that started this thread). Hopefully the above helps
illustrate the complexity hidden behind such a deceptively simple
question :)
Cheers,
Nick.
--
Nick Coghlan | ncoghlan at gmail.com | Brisbane, Australia
More information about the Python-ideas
mailing list