[Python-ideas] Are there asynchronous generators?

Paul Sokolovsky pmiscml at gmail.com
Sun Jun 28 17:52:49 CEST 2015


Hello,

On Sun, 28 Jun 2015 12:02:01 +0200
Adam Bartoš <drekin at gmail.com> wrote:

> Is there a way for a producer to say that there will be no more items
> put, so consumers get something like StopIteration when there are no
> more items left afterwards?

Sure, just designate sentinel value of your likes (StopIteration class
value seems an obvious choice) and use it for that purpose.

> There is also the problem that one cannot easily feed a queue,
> asynchronous generator, or any asynchronous iterator to a simple
> synchronous consumer like sum() or list() or "".join(). It would be
> nice if there was a way to wrap them to asynchronous ones when needed
> – something like (async sum)(asynchronously_produced_numbers()).

All that is easily achievable with classical Python coroutines, not
with asyncio garden variety of coroutines, which lately were casted
into a language level with async/await disablers:

def coro1():
    yield 1
    yield 2
    yield 3

def coro2():
    yield from coro1()
    yield 4
    yield 5

print(sum(coro2()))

And back to your starter question, it's also possible - and also only
with classical Python coroutines. I mentioned not just possibility, but
necessity of that in my independent "reverse engineering" of how yield
from works https://dl.dropboxusercontent.com/u/44884329/yield-from.pdf
(point 9 there). That's simplistic presentation, and in the presence of
"syscall main loop", example there would need to be:


class MyValueWrapper:
    def __init__(self, v):
        self.v = v

def pump(ins, outs):
    for chunk in gen(ins):
         if isinstance(chunk, MyValueWrapper):
             # if value we got from a coro is of
             # type we expect, process it
             yield from outs.write(chunk.v)
         else:
	     # anything else is simply not for us,
             # re-yield it to higher levels (ultimately, mainloop)
             yield chunk


def gen(ins):
    yield MyValueWrapper("<b>")
    # Assume read_in_chunks() already yields MyValueWrapper objects
    yield from ins.read_in_chunks(1000*1000*1000)
    yield MyValueWrapper("</b>")




> 
> 
> On Wed, Jun 24, 2015 at 1:54 PM, Jonathan Slenders
> <jonathan at slenders.be> wrote:
> 
> > In my experience, it's much easier to use asyncio Queues for this.
> > Instead of yielding, push to a queue. The consumer can then use
> > "await queue.get()".
> >
> > I think the semantics of the generator become too complicated
> > otherwise, or maybe impossible.
> > Maybe have a look at this article:
> > http://www.interact-sw.co.uk/iangblog/2013/11/29/async-yield-return
> >
> > Jonathan
> >
> >
> >
> >
> > 2015-06-24 12:13 GMT+02:00 Andrew Svetlov
> > <andrew.svetlov at gmail.com>:
> >
> >> Your idea is clean and maybe we will allow `yield` inside `async
> >> def` in Python 3.6.
> >> For PEP 492 it was too big change.
> >>
> >> On Wed, Jun 24, 2015 at 12:00 PM, Adam Bartoš <drekin at gmail.com>
> >> wrote:
> >> > Hello,
> >> >
> >> > I had a generator producing pairs of values and wanted to feed
> >> > all the
> >> first
> >> > members of the pairs to one consumer and all the second members
> >> > to
> >> another
> >> > consumer. For example:
> >> >
> >> > def pairs():
> >> >     for i in range(4):
> >> >         yield (i, i ** 2)
> >> >
> >> > biconsumer(sum, list)(pairs()) -> (6, [0, 1, 4, 9])
> >> >
> >> > The point is I wanted the consumers to be suspended and resumed
> >> > in a coordinated manner: The first producer is invoked, it wants
> >> > the first element. The coordinator implemented by biconsumer
> >> > function invokes
> >> pairs(),
> >> > gets the first pair and yields its first member to the first
> >> > consumer.
> >> Then
> >> > it wants the next element, but now it's the second consumer's
> >> > turn, so
> >> the
> >> > first consumer is suspended and the second consumer is invoked
> >> > and fed
> >> with
> >> > the second member of the first pair. Then the second producer
> >> > wants the
> >> next
> >> > element, but it's the first consumer's turn… and so on. In the
> >> > end,
> >> when the
> >> > stream of pairs is exhausted, StopIteration is thrown to both
> >> > consumers
> >> and
> >> > their results are combined.
> >> >
> >> > The cooperative asynchronous nature of the execution reminded me
> >> asyncio and
> >> > coroutines, so I thought that biconsumer may be implemented
> >> > using them. However, it seems that it is imposible to write an
> >> > "asynchronous
> >> generator"
> >> > since the "yielding pipe" is already used for the communication
> >> > with the scheduler. And even if it was possible to make an
> >> > asynchronous
> >> generator, it
> >> > is not clear how to feed it to a synchronous consumer like sum()
> >> > or
> >> list()
> >> > function.
> >> >
> >> > With PEP 492 the concepts of generators and coroutines were
> >> > separated,
> >> so
> >> > asyncronous generators may be possible in theory. An ordinary
> >> > function
> >> has
> >> > just the returning pipe – for returning the result to the
> >> > caller. A generator has also a yielding pipe – used for yielding
> >> > the values during iteration, and its return pipe is used to
> >> > finish the iteration. A native coroutine has a returning pipe –
> >> > to return the result to a caller just
> >> like
> >> > an ordinary function, and also an async pipe – used for
> >> > communication
> >> with a
> >> > scheduler and execution suspension. An asynchronous generator
> >> > would just have both yieling pipe and async pipe.
> >> >
> >> > So my question is: was the code like the following considered?
> >> > Does it
> >> make
> >> > sense? Or are there not enough uses cases for such code? I found
> >> > only a short mention in
> >> > https://www.python.org/dev/peps/pep-0492/#coroutine-generators,
> >> > so
> >> possibly
> >> > these coroutine-generators are the same idea.
> >> >
> >> > async def f():
> >> >     number_string = await fetch_data()
> >> >     for n in number_string.split():
> >> >         yield int(n)
> >> >
> >> > async def g():
> >> >     result = async/await? sum(f())
> >> >     return result
> >> >
> >> > async def h():
> >> >     the_sum = await g()
> >> >
> >> > As for explanation about the execution of h() by an event loop:
> >> > h is a native coroutine called by the event loop, having both
> >> > returning pipe
> >> and
> >> > async pipe. The returning pipe leads to the end of the task, the
> >> > async
> >> pipe
> >> > is used for cummunication with the scheduler. Then, g() is called
> >> > asynchronously – using the await keyword means the the access to
> >> > the
> >> async
> >> > pipe is given to the callee. Then g() invokes the asyncronous
> >> > generator
> >> f()
> >> > and gives it the access to its async pipe, so when f() is
> >> > yielding
> >> values to
> >> > sum, it can also yield a future to the scheduler via the async
> >> > pipe and suspend the whole task.
> >> >
> >> > Regards, Adam Bartoš
> >> >
> >> >
> >> > _______________________________________________
> >> > Python-ideas mailing list
> >> > Python-ideas at python.org
> >> > https://mail.python.org/mailman/listinfo/python-ideas
> >> > Code of Conduct: http://python.org/psf/codeofconduct/
> >>
> >>
> >>
> >> --
> >> Thanks,
> >> Andrew Svetlov
> >> _______________________________________________
> >> Python-ideas mailing list
> >> Python-ideas at python.org
> >> https://mail.python.org/mailman/listinfo/python-ideas
> >> Code of Conduct: http://python.org/psf/codeofconduct/
> >
> >
> >



-- 
Best regards,
 Paul                          mailto:pmiscml at gmail.com


More information about the Python-ideas mailing list