Additive merging of async generators

Hi,
I was wondering if anyone has had experience implementing a similar pattern to this or has alternative suggestions of how to achieve it.
I have a bunch of async generators which I’d like to be able to merge into a single async generator I can iterate over. I found Vincent’s aiostream library which gives me this without too much effort:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: print(v)
if __name__ == '__main__': run(main())
However, I also would like to be able to add additional tasks into the list once I’ve started iterating the list so something more akin to:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: If v == 50: tasks.merge(go()) print(v)
if __name__ == '__main__': run(main())
Has anyone been able to achieve something like this?
p.s. I know appending to a list you’re iterating is bad practice, I assume the same would be true modifying this stream object, but think the example illustrates what I’m trying to achieve.
Thanks, James

You are not appending to the list that's being iterated ;)
tasks is an async generator or possibly a custom object that overrides aiter, anext, etc.
I'd say look at internals of aiostream's merge, it should not be too hard to extend perhaps On Wed, 18 Jul 2018 at 9:01 PM, James Stidard jamesstidard@gmail.com wrote:
Hi,
I was wondering if anyone has had experience implementing a similar pattern to this or has alternative suggestions of how to achieve it.
I have a bunch of async generators which I’d like to be able to merge into a single async generator I can iterate over. I found Vincent’s aiostream library which gives me this without too much effort:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: print(v)
if __name__ == '__main__': run(main())
However, I also would like to be able to add additional tasks into the list once I’ve started iterating the list so something more akin to:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: If v == 50: tasks.merge(go()) print(v)
if __name__ == '__main__': run(main())
Has anyone been able to achieve something like this?
p.s. I know appending to a list you’re iterating is bad practice, I assume the same would be true modifying this stream object, but think the example illustrates what I’m trying to achieve.
Thanks, James
Async-sig mailing list Async-sig@python.org https://mail.python.org/mailman/listinfo/async-sig Code of Conduct: https://www.python.org/psf/codeofconduct/

Looking into the internals of aiostream, it's meant to accept an async sequence of generators, see advanced.py flatmap. (perhaps some other function has to be used than merge().)
In which case, you could do something along the lines of:
async def tasks(some_queue): yield go() yield go() yield go() while True: try: yield (await some_queue.get()) except Closed: return
And then:
q = ... async for v in flatmap(tasks(q)): if x: q.put(something_new())
The downside of this implementation is that deadlock is trivial. Then you'd probably try:
try: yield (await some_queue.get()) except Empty: return
Which would not block, but could conceivably drop last added task.
Then more straightforward implementation would be as follows, it's easier to validate and verify:
async def tasks(backlog=[]): while backlog: yield backlog.pop()
usage:
backlog = [go(), go(), go()] async for v in flatmap(tasks(backlog)): if x: backlog.append(go())
Which of course doesn't do what you want -- you want "all" tasks to start executing at the same time and then change what "all" means :) This implies that task generator needs to async-block, and then [magically] unblock when all tasks finish producing values (or are cancelled).
I think aiostream has enough tools to put together what you want.
If I were to hand-craft it, it would look something like this (pseudocode only):
myflatten(source): generators = [] while generators and source-not-exhausted: item = await futures.wait(generators + [source], return_when=FIRST_COMPLETED)... if item is-a-generator: generators.append(item) else: yield item
On 18 July 2018 at 21:01, James Stidard jamesstidard@gmail.com wrote:
Hi,
I was wondering if anyone has had experience implementing a similar pattern to this or has alternative suggestions of how to achieve it.
I have a bunch of async generators which I’d like to be able to merge into a single async generator I can iterate over. I found Vincent’s aiostream library which gives me this without too much effort:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: print(v)
if __name__ == '__main__': run(main())
However, I also would like to be able to add additional tasks into the list once I’ve started iterating the list so something more akin to:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: If v == 50: tasks.merge(go()) print(v)
if __name__ == '__main__': run(main())
Has anyone been able to achieve something like this?
p.s. I know appending to a list you’re iterating is bad practice, I assume the same would be true modifying this stream object, but think the example illustrates what I’m trying to achieve.
Thanks, James
Async-sig mailing list Async-sig@python.org https://mail.python.org/mailman/listinfo/async-sig Code of Conduct: https://www.python.org/psf/codeofconduct/

Just wanted to reply now so these comments aren’t into the void.
These all look very helpful, thanks. Will sit down at the weekend and try and cobble something together from them.
Thanks again, James
On 20 July 2018 at 02:08:22, Dima Tisnek (dimaqq@gmail.com) wrote:
Looking into the internals of aiostream, it's meant to accept an async sequence of generators, see advanced.py flatmap. (perhaps some other function has to be used than merge().)
In which case, you could do something along the lines of:
async def tasks(some_queue): yield go() yield go() yield go() while True: try: yield (await some_queue.get()) except Closed: return
And then:
q = ... async for v in flatmap(tasks(q)): if x: q.put(something_new())
The downside of this implementation is that deadlock is trivial. Then you'd probably try:
try: yield (await some_queue.get()) except Empty: return
Which would not block, but could conceivably drop last added task.
Then more straightforward implementation would be as follows, it's easier to validate and verify:
async def tasks(backlog=[]): while backlog: yield backlog.pop()
usage:
backlog = [go(), go(), go()] async for v in flatmap(tasks(backlog)): if x: backlog.append(go())
Which of course doesn't do what you want -- you want "all" tasks to start executing at the same time and then change what "all" means :) This implies that task generator needs to async-block, and then [magically] unblock when all tasks finish producing values (or are cancelled).
I think aiostream has enough tools to put together what you want.
If I were to hand-craft it, it would look something like this (pseudocode only):
myflatten(source): generators = [] while generators and source-not-exhausted: item = await futures.wait(generators + [source], return_when=FIRST_COMPLETED)... if item is-a-generator: generators.append(item) else: yield item
On 18 July 2018 at 21:01, James Stidard jamesstidard@gmail.com wrote:
Hi,
I was wondering if anyone has had experience implementing a similar pattern to this or has alternative suggestions of how to achieve it.
I have a bunch of async generators which I’d like to be able to merge into a single async generator I can iterate over. I found Vincent’s aiostream library which gives me this without too much effort:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: print(v)
if __name__ == '__main__': run(main())
However, I also would like to be able to add additional tasks into the list once I’ve started iterating the list so something more akin to:
from asyncio import sleep, run from aiostream.stream import merge
async def go(): yield 0 await sleep(1) yield 50 await sleep(1) yield 100
async def main(): tasks = merge(go(), go(), go())
async for v in tasks: If v == 50: tasks.merge(go()) print(v)
if __name__ == '__main__': run(main())
Has anyone been able to achieve something like this?
p.s. I know appending to a list you’re iterating is bad practice, I assume the same would be true modifying this stream object, but think the example illustrates what I’m trying to achieve.
Thanks, James
Async-sig mailing list Async-sig@python.org https://mail.python.org/mailman/listinfo/async-sig Code of Conduct: https://www.python.org/psf/codeofconduct/
participants (2)
-
Dima Tisnek
-
James Stidard