[Python-ideas] Are there asynchronous generators?

Nick Coghlan ncoghlan at gmail.com
Tue Jun 30 06:08:19 CEST 2015


On 30 June 2015 at 07:51, Ron Adam <ron3200 at gmail.com> wrote:
>
> On 06/29/2015 07:23 AM, Nick Coghlan wrote:
>>
>>
>> On 29 Jun 2015 7:33 pm, "Guido van Rossum"
>> <guido at python.org
>> <mailto:guido at python.org>> wrote:
>>  >
>>  > Not following this in detail, but want to note that async isn't a good
>> model for parallelization (except I/O) because the expectation of
>> coroutines is single threading. The event loop serializes callbacks.
>> Changing this would break expectations and code.
>>
>> Yeah, it's a bad idea - I realised after reading your post that because
>> submission for scheduling and waiting for a result can already be
>> separated
>> it should be possible in Py 3.5 to write a "parallel" asynchronous
>> iterator
>> that eagerly consumes the awaitables produced by another asynchronous
>> iterator, schedules them all, then produces the awaitables in order.
>>
>> (That idea is probably as clear as mud without code to show what I
>> mean...)
>
>
> Only the parts concerning "schedules them all", and "produces awaitables in
> order".   ;-)

Some completely untested conceptual code that may not even compile,
let alone run, but hopefully conveys what I mean better than English
does:

    def get_awaitables(self, async_iterable):
        """Gets a list of awaitables from an asynchronous iterator"""
        asynciter = async_iterable.__aiter__()
        awaitables = []
        while True:
            try:
                awaitables.append(asynciter.__anext__())
            except StopAsyncIteration:
                break
        return awaitables

    async def wait_for_result(awaitable):
        """Simple coroutine to wait for a single result"""
       return await awaitable

    def iter_coroutines(async_iterable):
        """Produces coroutines to wait for each result from an
asynchronous iterator"""
        for awaitable in get_awaitables(async_iterable):
               yield wait_for_result(awaitable)

    def iter_tasks(async_iterable, eventloop=None):
        """Schedules event loop tasks to wait for each result from an
asynchronous iterator"""
        if eventloop is None:
            eventloop = asyncio.get_event_loop()
        for coroutine in iter_coroutines(async_iterable):
               yield eventloop.create_task(coroutine)

    class aiter_parallel:
        """Asynchronous iterator to wait for several asynchronous
operations in parallel"""
       def __init__(self, async_iterable):
            # Concurrent evaluation of future results is launched immediately
            self._tasks = tasks = list(iter_tasks(async_iterable))
            self._taskiter = iter(tasks)
        def __aiter__(self):
            return self
        def __anext__(self):
            try:
                return next(self._taskiter)
            except StopIteration:
                raise StopAsyncIteration

    # Example reduction function
    async def sum_async(async_iterable, start=0):
        tally = start
        async for x in aiter_parallel(async_iterable):
            tally += x
        return x

    # Parallel sum from synchronous code:
    result = asyncio.get_event_loop().run_until_complete(sum_async(async_iterable))

    # Parallel sum from asynchronous code:
    result = await sum_async(async_iterable))

As the definition of "aiter_parallel" shows, we don't offer any nice
syntactic sugar for defining asynchronous iterators yet (hence the
question that started this thread). Hopefully the above helps
illustrate the complexity hidden behind such a deceptively simple
question :)

Cheers,
Nick.

-- 
Nick Coghlan   |   ncoghlan at gmail.com   |   Brisbane, Australia


More information about the Python-ideas mailing list