In asyncio, when a task awaits for another task (or future), it can be cancelled right after the awaited task finished (before the callback have been processed). Thus, if the awaited task has consumed data, the data is lost.
For instance, with the following code:
import asyncio
available_data = []
data_ready = asyncio.Future()
def feed_data(data):
global data_ready
available_data.append(data)
data_ready.set_result(None)
data_ready = asyncio.…
[View More]Future()
async def consume_data():
while not available_data:
await asyncio.shield(data_ready)
return available_data.pop()
async def wrapped_consumer():
task = asyncio.ensure_future(consume_data())
return await task
If I perform those exact steps:
async def test():
task = asyncio.ensure_future(wrapped_consumer())
await asyncio.sleep(0)
feed_data('data')
await asyncio.sleep(0)
task.cancel()
await asyncio.sleep(0)
print ('task', task)
print ('available_data', available_data)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
Then I can see that the task has been cancelled despite the data being consumed. Since the result of `wrapped_consumer` cannot be retrieved, the data is forever lost.
task <Task cancelled coro=<wrapped_consumer() done, defined at <ipython-input-1-de4ad193b1d0>:17>>
available_data []
This side effect does not happen when awaiting a coroutine, but coroutine are not as flexible as tasks. It happens when awaiting a `Future`, a `Task`, or any function like `asyncio.wait`, `asyncio.wait_for` or `asyncio.gather` (which all inherit from or use `Future`). There is then no way to do anything equivalent to:
stop_future = asyncio.Future()
async def wrapped_consumer2():
task = asyncio.ensure_future(consume_data())
try:
await asyncio.wait([task, stop_future])
finally:
task.cancel()
await asyncio.wait([task])
if not task.cancelled():
return task.result()
else:
raise RuntimeError('stopped')
This is due to the Future calling the callback asynchronously:
https://github.com/python/cpython/blob/3.6/Lib/asyncio/futures.py#L214
for callback in callbacks:
self._loop.call_soon(callback, self)
I propose to create synchronous versions of those, or a `synchronous_callback` parameter, that turns the callbacks of `Future` synchronous. I've experimented a simple librairy `syncio` with CPython 3.6 to do this (it is harder to patch later versions due to the massive use of private methods).
Basically, needs to:
1) replace the `Future._schedule_callbacks` method by a synchronous version
2) fix `Task._step` to not fail when cleaning `_current_tasks` (https://github.com/python/cpython/blob/3.6/Lib/asyncio/tasks.py#L245)
3) rewrite all the functions to use synchronous futures instead of normal ones
With that librairy, the previous functions are possible and intuitive
import syncio
async def wrapped_consumer():
task = syncio.ensure_sync_future(consume_data())
return await task
stop_future = asyncio.Future()
async def wrapped_consumer2():
task = syncio.ensure_sync_future(consume_data())
try:
await syncio.sync_wait([task, stop_future])
finally:
task.cancel()
await asyncio.wait([task])
if not task.cancelled():
return task.result()
else:
raise RuntimeError('stopped')
No need to use `syncio` anywhere else in the code, which makes it totally transparent for the end user.
This "library" can be found here: https://github.com/aure-olli/aiokafka/blob/216b4ad3b4115bc9fa463e44fe23636b…
It implements `SyncFuture`, `SyncTask`, `ensure_sync_future`, `sync_wait`, `sync_wait_for`, `sync_gather` and `sync_shield`. It works with CPython 3.6 only.
To conclude:
- asynchronous callbacks are preferrable in most case, but do not provide a coherent cancelled status in specific cases
- implementing a version with synchronous callback (or a `synchronous_callback` parameter) is rather easy (however step 2 need to be clarified, probably a cleaner way to fix this)
- it is totally transparent for the end user, as synchronous callback are totally compatible with asynchronous ones
[View Less]