Anything better than asyncio.as_completed() and asyncio.wait() to manage execution of large amount of tasks?

CHIN Dihedral dihedral88888 at gmail.com
Tue Jul 22 18:20:55 CEST 2014


On Thursday, July 17, 2014 7:09:02 AM UTC+8, Maxime Steisel wrote:
> 2014-07-15 14:20 GMT+02:00 Valery Khamenya <khamenya at gmail.com>:
> 
> > Hi,
> 
> >
> 
> > both asyncio.as_completed() and asyncio.wait() work with lists only. No
> 
> > generators are accepted. Are there anything similar to those functions that
> 
> > pulls Tasks/Futures/coroutines one-by-one and processes them in a limited
> 
> > task pool?
> 
> 
> 
> 
> 
> Something like this (adapted from as_completed) should do the work:
> 
> 
> 
> import asyncio
> 
> from concurrent import futures
> 
> 
> 
> def parallelize(tasks, *, loop=None, max_workers=5, timeout=None):
> 
>     loop = loop if loop is not None else asyncio.get_event_loop()
> 
>     workers = []
> 
>     pending = set()
> 
>     done = asyncio.Queue(maxsize=max_workers)
> 
>     exhausted = False
> 
> 
> 
>     @asyncio.coroutine
> 
>     def _worker():
> 
>         nonlocal exhausted
> 
>         while not exhausted:
> 
>             try:
> 
>                 t = next(tasks)
> 
>                 pending.add(t)
> 
>                 yield from t
> 
>                 yield from done.put(t)
> 
>                 pending.remove(t)
> 
>             except StopIteration:
> 
>                 exhausted = True
> 
> 
> 
>     def _on_timeout():
> 
>         for f in workers:
> 
>             f.cancel()
> 
>         workers.clear()
> 
>         #Wake up _wait_for_one()
> 
>         done.put_nowait(None)
> 
> 
> 
>     @asyncio.coroutine
> 
>     def _wait_for_one():
> 
>         f = yield from done.get()
> 
>         if f is None:
> 
>             raise futures.TimeoutError()
> 
>         return f.result()
> 
> 
> 
>     workers = [asyncio.async(_worker()) for i in range(max_workers)]
> 
> 
> 
>     if workers and timeout is not None:
> 
>         timeout_handle = loop.call_later(timeout, _on_timeout)
> 
> 
> 
>     while not exhausted or pending or not done.empty():
> 
>         yield _wait_for_one()
> 
> 
> 
>     timeout_handle.cancel()

Well, I think you are missing the 
task managers as workers in your flow
of logics. 

I suggest a better version is 
with a global signal of 8 to 16 times clock of the normal worker pace in 
order to cope with ASYN events 
accordingly for the workers which colud be  decorated to yield, but not in the worker's funtions.



More information about the Python-list mailing list