A more flexible task creation
I was working on a concurrency limiting code for asyncio, so the user may submit as many tasks as one wants, but only a max number of tasks will be submitted to the event loop at the same time. However, I wanted that passing an awaitable would always return a task, no matter if the task was currently scheduled or not. The goal is that you could add done callbacks to it, decide to force schedule it, etc I dug in the asyncio.Task code, and encountered: def __init__(self, coro, *, loop=None): ... self._loop.call_soon(self._step) self.__class__._all_tasks.add(self) I was surprised to see that instantiating a Task class has any side effect at all, let alone 2, and one of them being to be immediately scheduled for execution. I couldn't find a clean way to do what I wanted: either you loop.create_task() and you get a task but it runs, or you don't run anything, but you don't get a nice task object to hold on to. I tried several alternatives, like returning a future, and binding the future awaiting to the submission of a task, but that was complicated code that duplicated a lot of things. I tried creating a custom task, but it was even harder, setting a custom event policy, to provide a custom event loop with my own create_task() accepting parameters. That's a lot to do just to provide a parameter to Task, especially if you already use a custom event loop (e.g: uvloop). I was expecting to have to create a task factory only, but task factories can't get any additional parameters from create_task()). Additionally I can't use ensure_future(), as it doesn't allow to pass any parameter to the underlying Task, so if I want to accept any awaitable in my signature, I need to provide my own custom ensure_future(). All those implementations access a lot of _private_api, and do other shady things that linters hate; plus they are fragile at best. What's more, Task being rewritten in C prevents things like setting self._coro, so we can only inherit from the pure Python slow version. In the end, I can't even await the lazy task, because it blocks the entire program. Hence I have 2 distinct, but independent albeit related, proposals: - Allow Task to be created but not scheduled for execution, and add a parameter to ensure_future() and create_task() to control this. Awaiting such a task would just do like asyncio.sleep(O) until it is scheduled for execution. - Add an parameter to ensure_future() and create_task() named "kwargs" that accept a mapping and will be passed as **kwargs to the underlying created Task. I insist on the fact that the 2 proposals are independent, so please don't reject both if you don't like one or the other. Passing a parameter to the underlying custom Task is still of value even without the unscheduled instantiation, and vice versa. Also, if somebody has any idea on how to make a LazyTask that we can await on without blocking everything, I'll take it.
On Wed, Jun 13, 2018 at 4:47 PM Michel Desmoulin
I was working on a concurrency limiting code for asyncio, so the user may submit as many tasks as one wants, but only a max number of tasks will be submitted to the event loop at the same time.
What does that "concurrency limiting code" do? What problem does it solve?
However, I wanted that passing an awaitable would always return a task, no matter if the task was currently scheduled or not. The goal is that you could add done callbacks to it, decide to force schedule it, etc
The obvious advice is to create a new class "DelayedTask" with a Future-like API. You can then schedule the real awaitable that it wraps with `loop.create_task` at any point. Providing "add_done_callback"-like API is trivial. DelayedTask can itself be an awaitable, scheduling itself on a first __await__ call. As a benefit, your implementation will support any Task-like objects that alternative asyncio loops can implement. No need to mess with policies either.
I dug in the asyncio.Task code, and encountered:
def __init__(self, coro, *, loop=None): ... self._loop.call_soon(self._step) self.__class__._all_tasks.add(self)
I was surprised to see that instantiating a Task class has any side effect at all, let alone 2, and one of them being to be immediately scheduled for execution.
To be fair, implicitly scheduling a task for execution is what all async frameworks (twisted, curio, trio) do when you wrap a coroutine into a task. I don't recall them having a keyword argument to control when the task is scheduled.
I couldn't find a clean way to do what I wanted: either you loop.create_task() and you get a task but it runs, or you don't run anything, but you don't get a nice task object to hold on to.
A clean way is to create a new layer of abstraction (e.g. DelayedTask I suggested above). [..]
I tried creating a custom task, but it was even harder, setting a custom event policy, to provide a custom event loop with my own create_task() accepting parameters. That's a lot to do just to provide a parameter to Task, especially if you already use a custom event loop (e.g: uvloop). I was expecting to have to create a task factory only, but task factories can't get any additional parameters from create_task()).
I don't think creating a new Task implementation is needed here, a simple wrapper should work just fine. [..]
Hence I have 2 distinct, but independent albeit related, proposals:
- Allow Task to be created but not scheduled for execution, and add a parameter to ensure_future() and create_task() to control this. Awaiting such a task would just do like asyncio.sleep(O) until it is scheduled for execution.
- Add an parameter to ensure_future() and create_task() named "kwargs" that accept a mapping and will be passed as **kwargs to the underlying created Task.
I insist on the fact that the 2 proposals are independent, so please don't reject both if you don't like one or the other. Passing a parameter to the underlying custom Task is still of value even without the unscheduled instantiation, and vice versa.
Well, to add a 'kwargs' parameter to ensure_future() we need kwargs in Task.__init__. So far we only have 'loop' and it's not something that ensure_future() should allow you to override. So unless we implement the first proposal, we don't need the second. Yury
How about:
async def wait_to_run(async_fn, *args):
await wait_for_something()
return await async_fn(*args)
task = loop.create_task(wait_to_run(myfunc, ...))
-----
Whatever strategy you use, you should also think about what semantics you
want if one of these delayed tasks is cancelled before it starts.
For regular, non-delayed tasks, Trio makes sure that even if it gets
cancelled before it starts, then it still gets scheduled and runs until the
first cancellation point. This is necessary for correct resource hand-off
between tasks:
async def some_task(handle):
with handle:
await ...
If we skipped running this task entirely, then the handle wouldn't be
closed properly; scheduling it once allows the with block to run, and then
get cleaned up by the cancellation exception. I'm not sure but I think
asyncio handles pre-cancellation in a similar way. (Yury, do you know?)
Now, in delayed task case, there's a similar issue. If you want to keep the
same solution, then you might want to instead write:
# asyncio
async def wait_to_run(async_fn, *args):
try:
await wait_for_something()
except asyncio.CancelledError:
# have to create a subtask to make it cancellable
subtask = loop.create_task(async_fn(*args))
# then cancel it immediately
subtask.cancel()
# and wait for the cancellation to be processed
return await subtask
else:
return await async_fn(*args)
In trio, this could be simplified to
# trio
async def wait_to_run(async_fn, *args):
try:
await wait_for_something()
except trio.Cancelled:
pass
return await async_fn(*args)
(This works because of trio's "stateful cancellation" – if the whole thing
is cancelled, then as soon as async_fn hits a cancellation point the
exception will be re-delivered.)
-n
On Wed, Jun 13, 2018, 13:47 Michel Desmoulin
I was working on a concurrency limiting code for asyncio, so the user may submit as many tasks as one wants, but only a max number of tasks will be submitted to the event loop at the same time.
However, I wanted that passing an awaitable would always return a task, no matter if the task was currently scheduled or not. The goal is that you could add done callbacks to it, decide to force schedule it, etc
I dug in the asyncio.Task code, and encountered:
def __init__(self, coro, *, loop=None): ... self._loop.call_soon(self._step) self.__class__._all_tasks.add(self)
I was surprised to see that instantiating a Task class has any side effect at all, let alone 2, and one of them being to be immediately scheduled for execution.
I couldn't find a clean way to do what I wanted: either you loop.create_task() and you get a task but it runs, or you don't run anything, but you don't get a nice task object to hold on to.
I tried several alternatives, like returning a future, and binding the future awaiting to the submission of a task, but that was complicated code that duplicated a lot of things.
I tried creating a custom task, but it was even harder, setting a custom event policy, to provide a custom event loop with my own create_task() accepting parameters. That's a lot to do just to provide a parameter to Task, especially if you already use a custom event loop (e.g: uvloop). I was expecting to have to create a task factory only, but task factories can't get any additional parameters from create_task()).
Additionally I can't use ensure_future(), as it doesn't allow to pass any parameter to the underlying Task, so if I want to accept any awaitable in my signature, I need to provide my own custom ensure_future().
All those implementations access a lot of _private_api, and do other shady things that linters hate; plus they are fragile at best. What's more, Task being rewritten in C prevents things like setting self._coro, so we can only inherit from the pure Python slow version.
In the end, I can't even await the lazy task, because it blocks the entire program.
Hence I have 2 distinct, but independent albeit related, proposals:
- Allow Task to be created but not scheduled for execution, and add a parameter to ensure_future() and create_task() to control this. Awaiting such a task would just do like asyncio.sleep(O) until it is scheduled for execution.
- Add an parameter to ensure_future() and create_task() named "kwargs" that accept a mapping and will be passed as **kwargs to the underlying created Task.
I insist on the fact that the 2 proposals are independent, so please don't reject both if you don't like one or the other. Passing a parameter to the underlying custom Task is still of value even without the unscheduled instantiation, and vice versa.
Also, if somebody has any idea on how to make a LazyTask that we can await on without blocking everything, I'll take it.
_______________________________________________ Python-Dev mailing list Python-Dev@python.org https://mail.python.org/mailman/listinfo/python-dev Unsubscribe: https://mail.python.org/mailman/options/python-dev/njs%40pobox.com
Le 14/06/2018 à 04:09, Nathaniel Smith a écrit :
How about:
async def wait_to_run(async_fn, *args): await wait_for_something() return await async_fn(*args)
task = loop.create_task(wait_to_run(myfunc, ...))
It's quite elegant, although figuring out the wait_for_something() is going to be tricky.
-----
Whatever strategy you use, you should also think about what semantics you want if one of these delayed tasks is cancelled before it starts.
For regular, non-delayed tasks, Trio makes sure that even if it gets cancelled before it starts, then it still gets scheduled and runs until the first cancellation point. This is necessary for correct resource hand-off between tasks:
async def some_task(handle): with handle: await ...
If we skipped running this task entirely, then the handle wouldn't be closed properly; scheduling it once allows the with block to run, and then get cleaned up by the cancellation exception. I'm not sure but I think asyncio handles pre-cancellation in a similar way. (Yury, do you know?
Now, in delayed task case, there's a similar issue. If you want to keep the same solution, then you might want to instead write:
# asyncio async def wait_to_run(async_fn, *args): try: await wait_for_something() except asyncio.CancelledError: # have to create a subtask to make it cancellable subtask = loop.create_task(async_fn(*args)) # then cancel it immediately subtask.cancel() # and wait for the cancellation to be processed return await subtask else: return await async_fn(*args)
In trio, this could be simplified to
# trio async def wait_to_run(async_fn, *args): try: await wait_for_something() except trio.Cancelled: pass return await async_fn(*args)
(This works because of trio's "stateful cancellation" – if the whole thing is cancelled, then as soon as async_fn hits a cancellation point the exception will be re-delivered.)
Thanks for the tip. It schedules it in all cases, but I don't know what asyncio does with it. I'll add a unit test for that.
participants (3)
-
Michel Desmoulin
-
Nathaniel Smith
-
Yury Selivanov