asyncio: futures and tasks with synchronous callbacks

In asyncio, when a task awaits for another task (or future), it can be cancelled right after the awaited task finished (before the callback have been processed). Thus, if the awaited task has consumed data, the data is lost. For instance, with the following code: import asyncio available_data = [] data_ready = asyncio.Future() def feed_data(data): global data_ready available_data.append(data) data_ready.set_result(None) data_ready = asyncio.Future() async def consume_data(): while not available_data: await asyncio.shield(data_ready) return available_data.pop() async def wrapped_consumer(): task = asyncio.ensure_future(consume_data()) return await task If I perform those exact steps: async def test(): task = asyncio.ensure_future(wrapped_consumer()) await asyncio.sleep(0) feed_data('data') await asyncio.sleep(0) task.cancel() await asyncio.sleep(0) print ('task', task) print ('available_data', available_data) loop = asyncio.get_event_loop() loop.run_until_complete(test()) Then I can see that the task has been cancelled despite the data being consumed. Since the result of `wrapped_consumer` cannot be retrieved, the data is forever lost. task <Task cancelled coro=<wrapped_consumer() done, defined at <ipython-input-1-de4ad193b1d0>:17>> available_data [] This side effect does not happen when awaiting a coroutine, but coroutine are not as flexible as tasks. It happens when awaiting a `Future`, a `Task`, or any function like `asyncio.wait`, `asyncio.wait_for` or `asyncio.gather` (which all inherit from or use `Future`). There is then no way to do anything equivalent to: stop_future = asyncio.Future() async def wrapped_consumer2(): task = asyncio.ensure_future(consume_data()) try: await asyncio.wait([task, stop_future]) finally: task.cancel() await asyncio.wait([task]) if not task.cancelled(): return task.result() else: raise RuntimeError('stopped') This is due to the Future calling the callback asynchronously: https://github.com/python/cpython/blob/3.6/Lib/asyncio/futures.py#L214 for callback in callbacks: self._loop.call_soon(callback, self) I propose to create synchronous versions of those, or a `synchronous_callback` parameter, that turns the callbacks of `Future` synchronous. I've experimented a simple librairy `syncio` with CPython 3.6 to do this (it is harder to patch later versions due to the massive use of private methods). Basically, needs to: 1) replace the `Future._schedule_callbacks` method by a synchronous version 2) fix `Task._step` to not fail when cleaning `_current_tasks` (https://github.com/python/cpython/blob/3.6/Lib/asyncio/tasks.py#L245) 3) rewrite all the functions to use synchronous futures instead of normal ones With that librairy, the previous functions are possible and intuitive import syncio async def wrapped_consumer(): task = syncio.ensure_sync_future(consume_data()) return await task stop_future = asyncio.Future() async def wrapped_consumer2(): task = syncio.ensure_sync_future(consume_data()) try: await syncio.sync_wait([task, stop_future]) finally: task.cancel() await asyncio.wait([task]) if not task.cancelled(): return task.result() else: raise RuntimeError('stopped') No need to use `syncio` anywhere else in the code, which makes it totally transparent for the end user. This "library" can be found here: https://github.com/aure-olli/aiokafka/blob/216b4ad3b4115bc9fa463e44fe23636bd... It implements `SyncFuture`, `SyncTask`, `ensure_sync_future`, `sync_wait`, `sync_wait_for`, `sync_gather` and `sync_shield`. It works with CPython 3.6 only. To conclude: - asynchronous callbacks are preferrable in most case, but do not provide a coherent cancelled status in specific cases - implementing a version with synchronous callback (or a `synchronous_callback` parameter) is rather easy (however step 2 need to be clarified, probably a cleaner way to fix this) - it is totally transparent for the end user, as synchronous callback are totally compatible with asynchronous ones

It seems inevitable that if you use await twice you risk being cancelled in between. The solution is to only use a single await to do all the work, like asyncio.queue does (and why not use that for your use case?). I don't think inventing a parallel API of synchronous callbacks is a good idea -- as you say there's a good reason why callbacks are asynchronous, and having two subtly different ways of doing things seems more confusing than helpful. Async I/O is already complicated -- let's not make it more so. On Tue, Jul 30, 2019 at 2:28 AM <aurelien.lambert.89@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido) *Pronouns: he/him/his **(why is my pronoun here?)* <http://feministing.com/2015/02/03/how-using-they-as-a-singular-pronoun-can-c...>

Oh only now it appears in the list ! I thought the post hadn't working, so I posted again :/. I've fixed my "library" (https://github.com/aure-olli/aiokafka/blob/3acb88d6ece4502a78e230b234f47b90b...), and the `wrapped_consumer2` function. Now no double await, so no risk of afterward cancellation. stop_future = asyncio.Future() async def wrapped_consumer2(): task = asyncio.ensure_future(consume_data()) try: await asyncio.wait([task, stop_future]) finally: task.cancel() if not task.cancelled(): return task.result() else: raise RuntimeError('stopped') Or import syncio async def wrapped_consumer(): task = syncio.ensure_sync_future(consume_data()) return await task stop_future = asyncio.Future() async def wrapped_consumer2(): task = syncio.ensure_sync_future(consume_data()) try: await syncio.sync_wait([task, stop_future]) finally: task.cancel() if not task.cancelled(): return task.result() else: raise RuntimeError('stopped') My only concern is that consistent cancellation state is currently almost impossible with futures and tasks. The only two possibilities are either to ignore cancellation (highly counter intuitive to use), or to directly manipulate the coroutine as a generator (basically rewriting asyncio). Could be another library for sure, but the current state of asyncio makes it really hard to reuse it. So it would mean copy-pasting the whole library while changing few lines here and there.

Which means you cancel a running task but still have to wait for it and check if it eventually has a result. This is OK for internal use, but highly counter intuitive for the end-user. And it makes the cancelled status even more inconsistent, as calling cancel on a running task does not ensure it will actually be cancelled. This is highly disadvised by the documentation, and I think most users ignore a task once it has been cancelled while running.

Agree with Aurelien Lambert: if you call task.cancel() there is no sense for expecting a normal result from a task. My typical code for task cancellation looks like task.cancel() with contextlib.suppress(asyncio.CancelledError): await task If you need a partial task result you probably can build something with asyncio.Condition (or use queue) On Thu, Aug 1, 2019 at 7:20 AM <aurelien.lambert.89@gmail.com> wrote:
-- Thanks, Andrew Svetlov

It seems inevitable that if you use await twice you risk being cancelled in between. The solution is to only use a single await to do all the work, like asyncio.queue does (and why not use that for your use case?). I don't think inventing a parallel API of synchronous callbacks is a good idea -- as you say there's a good reason why callbacks are asynchronous, and having two subtly different ways of doing things seems more confusing than helpful. Async I/O is already complicated -- let's not make it more so. On Tue, Jul 30, 2019 at 2:28 AM <aurelien.lambert.89@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido) *Pronouns: he/him/his **(why is my pronoun here?)* <http://feministing.com/2015/02/03/how-using-they-as-a-singular-pronoun-can-c...>

Oh only now it appears in the list ! I thought the post hadn't working, so I posted again :/. I've fixed my "library" (https://github.com/aure-olli/aiokafka/blob/3acb88d6ece4502a78e230b234f47b90b...), and the `wrapped_consumer2` function. Now no double await, so no risk of afterward cancellation. stop_future = asyncio.Future() async def wrapped_consumer2(): task = asyncio.ensure_future(consume_data()) try: await asyncio.wait([task, stop_future]) finally: task.cancel() if not task.cancelled(): return task.result() else: raise RuntimeError('stopped') Or import syncio async def wrapped_consumer(): task = syncio.ensure_sync_future(consume_data()) return await task stop_future = asyncio.Future() async def wrapped_consumer2(): task = syncio.ensure_sync_future(consume_data()) try: await syncio.sync_wait([task, stop_future]) finally: task.cancel() if not task.cancelled(): return task.result() else: raise RuntimeError('stopped') My only concern is that consistent cancellation state is currently almost impossible with futures and tasks. The only two possibilities are either to ignore cancellation (highly counter intuitive to use), or to directly manipulate the coroutine as a generator (basically rewriting asyncio). Could be another library for sure, but the current state of asyncio makes it really hard to reuse it. So it would mean copy-pasting the whole library while changing few lines here and there.

Which means you cancel a running task but still have to wait for it and check if it eventually has a result. This is OK for internal use, but highly counter intuitive for the end-user. And it makes the cancelled status even more inconsistent, as calling cancel on a running task does not ensure it will actually be cancelled. This is highly disadvised by the documentation, and I think most users ignore a task once it has been cancelled while running.

Agree with Aurelien Lambert: if you call task.cancel() there is no sense for expecting a normal result from a task. My typical code for task cancellation looks like task.cancel() with contextlib.suppress(asyncio.CancelledError): await task If you need a partial task result you probably can build something with asyncio.Condition (or use queue) On Thu, Aug 1, 2019 at 7:20 AM <aurelien.lambert.89@gmail.com> wrote:
-- Thanks, Andrew Svetlov
participants (4)
-
Andrew Svetlov
-
aurelien.lambert.89@gmail.com
-
Bar Harel
-
Guido van Rossum