[Python-ideas] Are there asynchronous generators?

Oscar Benjamin oscar.j.benjamin at gmail.com
Wed Jul 1 14:01:50 CEST 2015


On 24 June 2015 at 10:00, Adam Bartoš <drekin at gmail.com> wrote:
>
> I had a generator producing pairs of values and wanted to feed all the first
> members of the pairs to one consumer and all the second members to another
> consumer. For example:
>
> def pairs():
>     for i in range(4):
>         yield (i, i ** 2)
>
> biconsumer(sum, list)(pairs()) -> (6, [0, 1, 4, 9])
>
> The point is I wanted the consumers to be suspended and resumed in a
> coordinated manner: The first producer is invoked, it wants the first
> element. The coordinator implemented by biconsumer function invokes pairs(),
> gets the first pair and yields its first member to the first consumer. Then
> it wants the next element, but now it's the second consumer's turn, so the
> first consumer is suspended and the second consumer is invoked and fed with
> the second member of the first pair. Then the second producer wants the next
> element, but it's the first consumer's turn… and so on. In the end, when the
> stream of pairs is exhausted, StopIteration is thrown to both consumers and
> their results are combined.

Unfortunately this is not possible with generators or with coroutines.
Remember that the async coroutine stuff doesn't actually add any
fundamental new capability to the language. It's really just a cleaner
syntax for a particular way of using generators. Anything you can do
with coroutines is also possible with generators (hence 3.4's asyncio
does all its stuff with ordinary generators).

The problem is fundamental: iterable consumers like sum, list etc
drive the flow control of execution. You can suspend them by feeding
in a generator but they aren't really suspended the way a generator is
because the consumer remains at the base of the call stack.

If you can rewrite the consumers though it is possible to rewrite them
in a fairly simple way using generators so that you can push values in
suspending after each push.

Suppose I have a consumer function that looks like:

def func(iterable):
    <init>
    for x in iterable:
        <block>
    return <expr>

I can rewrite it as a feed-in generator like so:

def gfunc():
    <init>
    yield lambda: <expr>
    while True:
        x = yield
        <block>

When I call this function I get a generator. I can call next on that
generator to get a result function. I can then push values in with the
send method. When I'm done pushing values in I can call the result
function to get the final result. Example:

>>> def gsum():
...     total = 0
...     yield lambda: total
...     while True:
...         x = yield
...         total += x
...
>>> summer = gsum()
>>> result = next(summer)
>>> next(summer) # Advance to next yield
>>> summer.send(1)
>>> summer.send(2)
>>> summer.send(3)
>>> result()
6

You can make a decorator to handle the awkwardness of calling the
generator and next-ing it. Also you can use the decorator to provide a
consumer function with the inverted consumer behaviour as an
attribute:

import functools

def inverted_consumer(func):

    @functools.wraps(func)
    def consumer(iterable):
        push, result = inverted()
        for x in iterable:
            push(x)
        return result()

    def inverted():
        gen = func()
        try:
            result = next(gen)
            next(gen)
        except StopIteration:
            raise RuntimeError
        return gen.send, result

    consumer.invert = inverted

    return consumer

@inverted_consumer
def mean():
    total = 0
    count = 0
    yield lambda: total / count
    while True:
        x = yield
        total += x
        count += 1

print(mean([4, 5, 6]))  # prints 5

push, result = mean.invert()
push(4)
push(5)
push(6)
print(result())  # Also prints 5

Having implemented your consumer functions in this way you can use
them normally but you can also implement the biconsumer functionailty
that you wanted (with obvious generalisation to an N-consumer
function):

def biconsumer(consumer1, consumer2, iterable):
    push1, result1 = consumer1.invert()
    push2, result2 = consumer2.invert()
    for val1, val2 in iterable:
        push1(val1)
        push2(val2)
    return result1(), result2()

Given some of the complaints about two colours of functions in other
posts in this thread perhaps asyncio could take a similar approach.
There could be a decorator so that I could define an async function
with:

@sync_callable
def func(...):
    ...

Then in asynchronous code I could call it as

    x = await func()

or in synchronous code it would be

    x = func.sync_call()

Presumably the sync_call version would fire up an event-loop and run
the function until complete. Perhaps it could also take other
arguments and have a signature like:

def sync_call_wrapper(args, kwargs, *, loop=None, timeout=None):
    ...

I'm not sure how viable this is given that different asynchronous
functions might need different event loops etc. but maybe there's some
sensible way to do it.

--
Oscar


More information about the Python-ideas mailing list