[Python-ideas] The async API of the future: yield-from

Nick Coghlan ncoghlan at gmail.com
Wed Oct 17 06:21:27 CEST 2012


On Wed, Oct 17, 2012 at 6:27 AM, Greg Ewing <greg.ewing at canterbury.ac.nz> wrote:
> Nick Coghlan wrote:
>
>>     # Note that this is an *ordinary iterator*, not a tasklet
>>     def as_completed(futures):
>>         # We ensure all the operations have started, and get ourselves
>> a set to work with
>>         remaining = set(futures)
>>         while remaining:
>>             # The trick here is that we *don't yield the original
>> futures directly*
>>             # Instead, we yield
>>             yield _wait_first(remaining)
>
>
> I've just figured out how your as_completed() thing works,
> and realised that it's *not* a general solution to the
> suspendable-iterator problem. You're making use of the fact
> that you know *how many* values there will be ahead of time,
> even if you don't know what they are yet.
>
> In general this won't be the case. I don't think there is
> any trick that will allow a for-loop to be used in the general
> case, because in order for an iterator to be suspendable, the
> call to next() would need to be made using yield-from, and
> it's hidden inside the for-loop implementation.

Yeah, that's what lets me get away with not passing the sent results
back down into the iterator (it can figure out from the original
arguments when it needs to stop). It gets trickier if you want to
terminate the iteration based on the result of an asynchronous
operation.

For example, here's a very simplistic way you could apply the concept
of "yield a future to be handled in the loop body" to the operation of
continuously reading binary data from a connection until EOF is
received:

    def read(self):
        """This knows how to start an IO operation such the future
will fire on completion"""
        future = ...
        return future

    # Again, notice this is *not* a tasklet, it's an ordinary iterator
that produces Future objects
    def readall(self):
        """This can be used in two modes - as an iterator or as a coroutine.

        As a coroutine:
            data = yield from conn.readall()

        As an iterator:
            for wait_for_chunk in conn.readall():
                try:
                    chunk = yield wait_for_chunk
                except EOFError:
                    break

        Obviously, the coroutine mode is far more convenient, but you
*can* override
        the default accumulator behaviour if you want/need to by
waiting on the individual
        futures explicitly. However, in this case, you lose the
automatic loop termination
        behaviour, so, you may as well implement the underlying loop explicitly:

            while 1:
                try:
                    chunk = yield self.read()
                except EOFError:
                    break

        """
        output = io.BytesIO()
        while 1:
            try:
                data = yield self.read()
            except EOFError:
                break
            if data: # This check makes iterator mode possible
                output.write(data)
        return output.getvalue()

Impedance matching in a way that allows the exception handling to be
factored out as well as the iteration step is a *lot* trickier, since
you need to bring context managers into play if termination is
signalled by an exception:

    # This version produces context managers rather than producing
futures directly, and thus can't be
    # used directly as a coroutine
    def read_chunks(self):
        finished = False
        @contextmanager
        def handle_chunk():
            nonlocal finished
            data = b''
            try:
                data = yield self.read()
            except EOFError:
                finished = True
            return data
        while not finished:
            yield handle_chunk()

    # Usage
    for handle_chunk in conn.read_chunks():
        with handle_chunk as wait_for_chunk:
            chunk = yield from wait_for_chunk
        # We end up doing a final "extra" iteration with chunk = b''
        # So we'd likely need to guard with an "if chunk:" or "if not
chunk: continue"
        # which again means we're not getting much value out of using
the iterator

Using an explicit "add_done_callback" doesn't help much, as you still
have to deal with the exception being thrown back in to your
generator.

I know Guido doesn't want people racing off and designing new syntax
for asynchronous iteration, but I'm not sure it's going to be possible
to avoid it if we want a clean approach to "forking" the results of
asynchronous calls between passing them down into a coroutine (to
decide whether or not to terminate iteration) and binding them to a
local variable (to allow local processing in the loop body). Compare
the arcane incantations above to something like (similar to
suggestions previously made by Christian Heimes):

    def read_chunks(self):
        """Designed for use as an asynchronous iterator"""
        while 1:
            try:
                yield self.read()
            except EOFError:
                break

    # Usage
    for chunk in yield from conn.read_chunks():
        ...

The idea here would be that whereas "for chunk in (yield from
conn.read_chunks()):" runs the underlying coroutine to completion and
then iterates over the return value, the version without the
parentheses would effectively "tee" the values being sent back,
*first* sending them to the underlying coroutine (to decide whether or
not iteration should continue and to get the value to be yielded at
the start of the next iteration) and then, if that doesn't raise
StopIteration, binding them to the local variable and proceeding to
execution of the loop body.

All that said, I still like Guido's concept that the core asynchronous
API is *really* future objects, just as it already is in the
concurrent.futures module. The @task decorator and yielding future
objects to that decorator is then just nice syntactic sugar for
hooking generators up to the "add_done_callback" API of future
objects. It's completely independent of the underlying event loop
and/or asynchronous IO interfaces - those interfaces are about setting
things up to invoke the set_* methods of the returned future objects
correctly, just as they are with the Executor API in
concurrent.futures.

> I know you probably weren't intending as_completed() to be
> a solution to the general suspendable-iterator problem.

Right, I just wanted to be sure that *that particular use case* of
waiting for a collection of futures and processing them in completion
order could be handled in terms of Guido's API *without* needing any
extra magic. The "iterate over data chunks until EOFError is raised"
is a better example for highlighting the "how do you write an
asynchronous iterator?" problem when it comes to
generators-as-coroutines.

Cheers,
Nick.

-- 
Nick Coghlan   |   ncoghlan at gmail.com   |   Brisbane, Australia



More information about the Python-ideas mailing list