<div dir="ltr">Hi Maxime,<div><br></div><div>many thanks for your great solution. It would be so great to have it in stock asyncio and use it out-of-the-box...</div><div>I've made 4 fixes to it that are rather of "cosmetic" nature. Here is the final version:</div>

<div class="gmail_extra"><br></div><div class="gmail_extra"><div class="gmail_extra">import asyncio</div><div class="gmail_extra">from concurrent import futures</div><div class="gmail_extra"><br></div><div class="gmail_extra">

<br></div><div class="gmail_extra">def as_completed_with_max_workers(tasks, *, loop=None, max_workers=5, timeout=None):</div><div class="gmail_extra">    loop = loop if loop is not None else asyncio.get_event_loop()</div>

<div class="gmail_extra">    workers = []</div><div class="gmail_extra">    pending = set()</div><div class="gmail_extra">    done = asyncio.Queue(maxsize=max_workers, loop=loop) # Valery: respect the "loop" parameter</div>

<div class="gmail_extra">    exhausted = False</div><div class="gmail_extra">    timeout_handle = None # Valery: added to see, if we indeed have to call timeout_handle.cancel()</div><div class="gmail_extra"><br></div><div class="gmail_extra">

    @asyncio.coroutine</div><div class="gmail_extra">    def _worker():</div><div class="gmail_extra">        nonlocal exhausted</div><div class="gmail_extra">        while not exhausted:</div><div class="gmail_extra">            try:</div>

<div class="gmail_extra">                t = next(tasks)</div><div class="gmail_extra">                pending.add(t)</div><div class="gmail_extra">                yield from t</div><div class="gmail_extra">                yield from done.put(t)</div>

<div class="gmail_extra">                pending.remove(t)</div><div class="gmail_extra">            except StopIteration:</div><div class="gmail_extra">                exhausted = True</div><div class="gmail_extra"><br>
</div>
<div class="gmail_extra">    def _on_timeout():</div><div class="gmail_extra">        for f in workers:</div><div class="gmail_extra">            f.cancel()</div><div class="gmail_extra">        workers.clear()</div><div class="gmail_extra">

        # Wake up _wait_for_one()</div><div class="gmail_extra">        done.put_nowait(None)</div><div class="gmail_extra"><br></div><div class="gmail_extra">    @asyncio.coroutine</div><div class="gmail_extra">    def _wait_for_one():</div>

<div class="gmail_extra">        f = yield from done.get()</div><div class="gmail_extra">        if f is None:</div><div class="gmail_extra">            raise futures.TimeoutError()</div><div class="gmail_extra">        return f.result()</div>

<div class="gmail_extra"><br></div><div class="gmail_extra">    workers = [asyncio.async(_worker(), loop=loop) for _ in range(max_workers)] # Valery: respect the "loop" parameter</div><div class="gmail_extra"><br>

</div><div class="gmail_extra">    if workers and timeout is not None:</div><div class="gmail_extra">        timeout_handle = loop.call_later(timeout, _on_timeout)</div><div class="gmail_extra"><br></div><div class="gmail_extra">

    while not exhausted or pending or not done.empty():</div><div class="gmail_extra">        yield _wait_for_one()</div><div class="gmail_extra"><br></div><div class="gmail_extra">    if timeout_handle: # Valery: call timeout_handle.cancel() only if it is needed</div>

<div class="gmail_extra">        timeout_handle.cancel()</div><div><br></div></div><div class="gmail_extra"><br clear="all"><div>best regards<br>--<br>Valery A.Khamenya</div>
<br><br></div></div>