Anything better than asyncio.as_completed() and asyncio.wait() to manage execution of large amount of tasks?
CHIN Dihedral
dihedral88888 at gmail.com
Tue Jul 22 12:20:55 EDT 2014
On Thursday, July 17, 2014 7:09:02 AM UTC+8, Maxime Steisel wrote:
> 2014-07-15 14:20 GMT+02:00 Valery Khamenya <khamenya at gmail.com>:
>
> > Hi,
>
> >
>
> > both asyncio.as_completed() and asyncio.wait() work with lists only. No
>
> > generators are accepted. Are there anything similar to those functions that
>
> > pulls Tasks/Futures/coroutines one-by-one and processes them in a limited
>
> > task pool?
>
>
>
>
>
> Something like this (adapted from as_completed) should do the work:
>
>
>
> import asyncio
>
> from concurrent import futures
>
>
>
> def parallelize(tasks, *, loop=None, max_workers=5, timeout=None):
>
> loop = loop if loop is not None else asyncio.get_event_loop()
>
> workers = []
>
> pending = set()
>
> done = asyncio.Queue(maxsize=max_workers)
>
> exhausted = False
>
>
>
> @asyncio.coroutine
>
> def _worker():
>
> nonlocal exhausted
>
> while not exhausted:
>
> try:
>
> t = next(tasks)
>
> pending.add(t)
>
> yield from t
>
> yield from done.put(t)
>
> pending.remove(t)
>
> except StopIteration:
>
> exhausted = True
>
>
>
> def _on_timeout():
>
> for f in workers:
>
> f.cancel()
>
> workers.clear()
>
> #Wake up _wait_for_one()
>
> done.put_nowait(None)
>
>
>
> @asyncio.coroutine
>
> def _wait_for_one():
>
> f = yield from done.get()
>
> if f is None:
>
> raise futures.TimeoutError()
>
> return f.result()
>
>
>
> workers = [asyncio.async(_worker()) for i in range(max_workers)]
>
>
>
> if workers and timeout is not None:
>
> timeout_handle = loop.call_later(timeout, _on_timeout)
>
>
>
> while not exhausted or pending or not done.empty():
>
> yield _wait_for_one()
>
>
>
> timeout_handle.cancel()
Well, I think you are missing the
task managers as workers in your flow
of logics.
I suggest a better version is
with a global signal of 8 to 16 times clock of the normal worker pace in
order to cope with ASYN events
accordingly for the workers which colud be decorated to yield, but not in the worker's funtions.
More information about the Python-list
mailing list