<div dir="ltr">Hi Maxime,<div><br></div><div>many thanks for your great solution. It would be so great to have it in stock asyncio and use it out-of-the-box...</div><div>I've made 4 fixes to it that are rather of "cosmetic" nature. Here is the final version:</div>
<div class="gmail_extra"><br></div><div class="gmail_extra"><div class="gmail_extra">import asyncio</div><div class="gmail_extra">from concurrent import futures</div><div class="gmail_extra"><br></div><div class="gmail_extra">
<br></div><div class="gmail_extra">def as_completed_with_max_workers(tasks, *, loop=None, max_workers=5, timeout=None):</div><div class="gmail_extra"> loop = loop if loop is not None else asyncio.get_event_loop()</div>
<div class="gmail_extra"> workers = []</div><div class="gmail_extra"> pending = set()</div><div class="gmail_extra"> done = asyncio.Queue(maxsize=max_workers, loop=loop) # Valery: respect the "loop" parameter</div>
<div class="gmail_extra"> exhausted = False</div><div class="gmail_extra"> timeout_handle = None # Valery: added to see, if we indeed have to call timeout_handle.cancel()</div><div class="gmail_extra"><br></div><div class="gmail_extra">
@asyncio.coroutine</div><div class="gmail_extra"> def _worker():</div><div class="gmail_extra"> nonlocal exhausted</div><div class="gmail_extra"> while not exhausted:</div><div class="gmail_extra"> try:</div>
<div class="gmail_extra"> t = next(tasks)</div><div class="gmail_extra"> pending.add(t)</div><div class="gmail_extra"> yield from t</div><div class="gmail_extra"> yield from done.put(t)</div>
<div class="gmail_extra"> pending.remove(t)</div><div class="gmail_extra"> except StopIteration:</div><div class="gmail_extra"> exhausted = True</div><div class="gmail_extra"><br>
</div>
<div class="gmail_extra"> def _on_timeout():</div><div class="gmail_extra"> for f in workers:</div><div class="gmail_extra"> f.cancel()</div><div class="gmail_extra"> workers.clear()</div><div class="gmail_extra">
# Wake up _wait_for_one()</div><div class="gmail_extra"> done.put_nowait(None)</div><div class="gmail_extra"><br></div><div class="gmail_extra"> @asyncio.coroutine</div><div class="gmail_extra"> def _wait_for_one():</div>
<div class="gmail_extra"> f = yield from done.get()</div><div class="gmail_extra"> if f is None:</div><div class="gmail_extra"> raise futures.TimeoutError()</div><div class="gmail_extra"> return f.result()</div>
<div class="gmail_extra"><br></div><div class="gmail_extra"> workers = [asyncio.async(_worker(), loop=loop) for _ in range(max_workers)] # Valery: respect the "loop" parameter</div><div class="gmail_extra"><br>
</div><div class="gmail_extra"> if workers and timeout is not None:</div><div class="gmail_extra"> timeout_handle = loop.call_later(timeout, _on_timeout)</div><div class="gmail_extra"><br></div><div class="gmail_extra">
while not exhausted or pending or not done.empty():</div><div class="gmail_extra"> yield _wait_for_one()</div><div class="gmail_extra"><br></div><div class="gmail_extra"> if timeout_handle: # Valery: call timeout_handle.cancel() only if it is needed</div>
<div class="gmail_extra"> timeout_handle.cancel()</div><div><br></div></div><div class="gmail_extra"><br clear="all"><div>best regards<br>--<br>Valery A.Khamenya</div>
<br><br></div></div>