[New-bugs-announce] [issue41505] asyncio.gather of large streams with limited resources

Kevin Amado report at bugs.python.org
Fri Aug 7 21:24:55 EDT 2020

New submission from Kevin Amado <kamadorueda at gmail.com>:

Sometimes when dealing with high concurrency systems developers face the problem of executing concurrently a large number of tasks while taking care of a finite pool of resources

Just to mention some examples:
- reading asynchronously a lot of files without exceeding the maximum number of open files by the operative system
- making millions of requests to a website, doing it in sufficiently small batches as not to be banned by the site's firewall or hitting API limits
- making a lot of DNS lookups without exceeding the maximum number of open sockets allowed by the operative system
- and many more

What these examples have in common is that there is a hard-limit in the maximum concurrency possible to solve the problem.

A naive approach is to split the long list of tasks in small batches and use asyncio.gather on each batch. This, however, has some downsides if one of the tasks takes more time than the others because at some point in time only this task would be running and the execution of the following batch gets delayed, impacting performance and overall throughput and execution time.

Another approach is to use asyncio.wait on a subset of tasks, gathering the done tasks and appending more tasks from the remaining subset until all tasks get executed. This alternative is good but still far from optimal as many boilerplate code is needed.

The ideal approach is to operate in the possibly infinite list of tasks with an always constant number of them being resolved concurrently. If one of the tasks being concurrently executed finishes then immediately another one is fetched from the input stream and added to the list of concurrently executing ones. By doing it in this way we optimize the resources needed while minimizing the total execution time and never exceeding the finite pool of resources (sockets, open files, http API limit), etc.

What I'm attaching is a proof of concept of a new function to add to the asyncio.tasks module that implements the ideal approach.

The proposed signature for such function is:

  async def materialize(aws, *, max_concurrency=None)

And functions in this way:

async def do(n: int) -> None:
    print('running', n)
    await asyncio.sleep(1)
    print('returning', n)
    return n

async def main():
    result = []
    async for x in materialize(map(do, range(5)), max_concurrency=2):
        print('got', x)


Whose output is:

running 0
running 1
returning 0
returning 1
got 0
got 1
running 2
running 3
returning 2
returning 3
got 2
got 3
running 4
returning 4
got 4
[0, 1, 2, 3, 4]

As you can see, tasks are resolved concurrently without exceeding the max concurrency allowed, yet always executing concurrently as many tasks as the limit specifies. Yielding results as soon as available, keeping a small memory footprint (proportional to the max concurrency allowed) and returning results in the same order of the input stream (opposite to asyncio.as_completed)

Since it's an asynchronous generator it can deal with infinite input streams, which is nice!

I'm willing to work further on a PR

components: asyncio
files: materialize.py
messages: 375028
nosy: asvetlov, kamadorueda, yselivanov
priority: normal
severity: normal
status: open
title: asyncio.gather of large streams with limited resources
type: enhancement
versions: Python 3.10
Added file: https://bugs.python.org/file49377/materialize.py

Python tracker <report at bugs.python.org>

More information about the New-bugs-announce mailing list