[Python-ideas] PEP 525: Asynchronous Generators

Yury Selivanov yselivanov.ml at gmail.com
Wed Aug 3 11:32:44 EDT 2016


Hi Stefan!

On 2016-08-03 2:45 AM, Stefan Behnel wrote:
> Hi!
>
> I'm all for this. I've run into it so many times already while implementing
> the async/await support in Cython, it's really a totally obvious extension
> to what there is currently, and it's practically a requirement for any
> serious usage of async iterators.
Thanks for the support!
>
> Some comments below.
>
> Yury Selivanov schrieb am 03.08.2016 um 00:31:
>> PEP 492 requires an event loop or a scheduler to run coroutines.
>> Because asynchronous generators are meant to be used from coroutines,
>> they also require an event loop to run and finalize them.
> Well, or *something* that uses them in the same way as an event loop would.
> Doesn't have to be an event loop.

Sure, I'm just using the same terminology PEP 492 was defined with.
We can say "coroutine runner" instead of "event loop".

>
>
>> 1. Implement an ``aclose`` method on asynchronous generators
>>     returning a special *awaitable*.  When awaited it
>>     throws a ``GeneratorExit`` into the suspended generator and
>>     iterates over it until either a ``GeneratorExit`` or
>>     a ``StopAsyncIteration`` occur.
>>
>>     This is very similar to what the ``close()`` method does to regular
>>     Python generators, except that an event loop is required to execute
>>     ``aclose()``.
> I don't see a motivation for adding an "aclose()" method in addition to the
> normal "close()" method. Similar for send/throw. Could you elaborate on that?

There will be no "close", "send" and "throw" defined for asynchronous
generators. Only their asynchronous equivalents.

This topic is actually quite complex, so bear with me.

1. It is important to understand that asynchronous iteration
protocol is multiplexed into normal iteration protocol.  For example:

   @types.coroutine
   def foobar():
       yield 'spam'

   async def agen():
       await foobar()
       yield 123

The 'agen' generator, on the lowest level of generators implementation
will yield two things -- 'spam', and a wrapped 123 value.  Because
123 is wrapped, the async generators machinery can distinguish async
yields from normal yields.

The idea behind __anext__ coroutine is that it yields through all
"normal" yields, and raises a StopIteration when it encounters
a "wrapped" yield (same idea behind aclose(), athrow(), and asend())

2. Now let's look at two generators (sync and async):


     def gen():                 async def agen():
         try:                       try:
             ...                        ...
         finally:                   finally:
             fin1()                     await afin1()
             fin2()                     await afin2()
             yield 123                  yield 123

* If we call 'gen().close()' when gen() is suspended somewhere in its
   try block, a GeneratorExit exception will be thrown in it.  Then,
   fin1() and fin2() calls will be executed.  Then, a 'yield 123' line
   will happen, which will cause a RuntimeError('generator yielded
   while closing').

   The reason for the RuntimeError is that the interpreter does not
   want generators to yield in their finally statements.  It wants
   them to synchronously finalize themselves.  Yielding while closing
   doesn't make any sense.

* Now, if we would just reuse the synchronous 'close()' method for
   'agen()' -- awaiting on 'afin1()' would simply result in a
   RuntimeError('generator yielded while closing').

   So the close() implementation for agen() must allow some yields,
   to make 'await' expressions possible in the finally block.  This
   is something that is absolutely required, because instead of
   try..finally you could have 'async with' block in agen() -- so
   the ability to call asynchronous code in the 'finally' block
   is very important.

Therefore, it's necessary to introduce a new close semantics for
asynchronous generators:

- It is OK to 'await' on anything in finally blocks in async
   generators.

- Trying to 'yield' in finally blocks will result in a
   RuntimeError('async generator yielded while closing') --
   similarly to sync generators.

- Because we have to allow awaits in generator's finally blocks,
   the new 'close' method has to be a coroutine-like object.

Since all this is quite different from sync generators' close
method, it was decided to have a different name for this method
for async generators: aclose.

aclose() is a coroutine-like object, you can await on it, and
you can even throw a CancelledError into it; so it's possible
to write 'await asyncio.wait_for(agen.aclose(), timeout=1)'.

3. asend() and athrow().  This is very similar to aclose().
Both have to be coroutines because async yields are multiplexed
into normal yields that awaitables use behind the scenes.

     async def foo():
        await asyncio.sleep(1)
        yield 123

If we had a synchronous send() method defined for foo(), you'd
see something like this:

     gen = foo()
     gen.send(None) -> <asyncio.Future>

Instead, what you really want is this:

     gen = foo()
     await gen.asend(None) -> 123

4. I really recommend you to play with the reference
implementation.  You can emulate synchronous "send" and "throw"
buy doing this trick:

     gen = foo()
     gen.__anext__().send()
     gen.__anext__().throw()

>
>> 3. Add two new methods to the ``sys`` module:
>>     ``set_asyncgen_finalizer()`` and ``get_asyncgen_finalizer()``.
>>
>> The idea behind ``sys.set_asyncgen_finalizer()`` is to allow event
>> loops to handle generators finalization, so that the end user
>> does not need to care about the finalization problem, and it just
>> works.
>>
>> When an asynchronous generator is iterated for the first time,
>> it stores a reference to the current finalizer.  If there is none,
>> a ``RuntimeError`` is raised.  This provides a strong guarantee that
>> every asynchronous generator object will always have a finalizer
>> installed by the correct event loop.
>>
>> When an asynchronous generator is about to be garbage collected,
>> it calls its cached finalizer.  The assumption is that the finalizer
>> will schedule an ``aclose()`` call with the loop that was active
>> when the iteration started.
>>
>> For instance, here is how asyncio is modified to allow safe
>> finalization of asynchronous generators::
>>
>>     # asyncio/base_events.py
>>
>>     class BaseEventLoop:
>>
>>         def run_forever(self):
>>             ...
>>             old_finalizer = sys.get_asyncgen_finalizer()
>>             sys.set_asyncgen_finalizer(self._finalize_asyncgen)
>>             try:
>>                 ...
>>             finally:
>>                 sys.set_asyncgen_finalizer(old_finalizer)
>>                 ...
>>
>>         def _finalize_asyncgen(self, gen):
>>             self.create_task(gen.aclose())
>>
>> ``sys.set_asyncgen_finalizer()`` is thread-specific, so several event
>> loops running in parallel threads can use it safely.
> Phew, this adds quite some complexity and magic. That is a problem. For
> one, this uses a global setup, so There Can Only Be One of these
> finalizers. ISTM that if special cleanup is required, either the asyncgen
> itself should know how to do that, or we should provide some explicit API
> that does something when *initialising* the asyncgen. That seems better
> than doing something global behind the users' back. Have you considered
> providing some kind of factory in asyncio that wraps asyncgens or so?

set_asyncgen_finalizer is thread-specific, so you can have one
finalizer set up per thread.

The reference implementation actually integrates this all into
asyncio.  The idea is to setup loop async gens finalizer just
before the loop starts, and reset the finalizer to the previous
one (usually it's None) just before it stops.

The finalizer is attached to a generator when it is yielding
for the first time -- this guarantees that every generators will
have a correct finalizer attached to it.

It's not right to attach the finalizer (or wrap the generator)
when the generator is initialized.  Consider this code:

    async def foo():
      async with smth():
        yield

    async def coro(gen):
      async for i in foo():
        ...

    loop.run_until_complete(coro(foo()))

^^ In the above example, when the 'foo()' is instantiated, there
is no loop or finalizers set up at all.  BUT since a loop (or
coroutine wrapper) is required to iterate async generators, there
is a strong guarantee that it *will* present on the first iteration.

Regarding "async gen itself should know how to cleanup" -- that's not
possible. async gen could just have an async with block and then
GCed (after being partially consumed).  Users won't expect to do
anything besides using try..finally or async with, so it's the
responsibility of the coroutine runner to cleanup async gen.  Hence
'aclose' is a coroutine, and hence this set_asyncgen_finalizer API
for coroutine runners.

This is indeed the most magical part of the proposal.  Although it's
important to understand that the regular Python users will likely
never encounter this in their life -- finalizers will be set up
by the framework they use (asyncio, Tornado, Twisted, you name it).

Thanks!
Yury


More information about the Python-ideas mailing list