It seems inevitable that if you use await twice you risk being cancelled in between. The solution is to only use a single await to do all the work, like asyncio.queue does (and why not use that for your use case?). I don't think inventing a parallel API of synchronous callbacks is a good idea -- as you say there's a good reason why callbacks are asynchronous, and having two subtly different ways of doing things seems more confusing than helpful. Async I/O is already complicated -- let's not make it more so.


On Tue, Jul 30, 2019 at 2:28 AM <aurelien.lambert.89@gmail.com> wrote:
In asyncio, when a task awaits for another task (or future), it can be cancelled right after the awaited task finished (before the callback have been processed). Thus, if the awaited task has consumed data, the data is lost.

For instance, with the following code:

    import asyncio

    available_data = []
    data_ready = asyncio.Future()

    def feed_data(data):
        global data_ready
        available_data.append(data)
        data_ready.set_result(None)
        data_ready = asyncio.Future()

    async def consume_data():
        while not available_data:
            await asyncio.shield(data_ready)
        return available_data.pop()

    async def wrapped_consumer():
        task = asyncio.ensure_future(consume_data())
        return await task

If I perform those exact steps:

    async def test():
        task = asyncio.ensure_future(wrapped_consumer())
        await asyncio.sleep(0)
        feed_data('data')
        await asyncio.sleep(0)
        task.cancel()
        await asyncio.sleep(0)
        print ('task', task)
        print ('available_data', available_data)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

Then I can see that the task has been cancelled despite the data being consumed. Since the result of `wrapped_consumer` cannot be retrieved, the data is forever lost.

    task <Task cancelled coro=<wrapped_consumer() done, defined at <ipython-input-1-de4ad193b1d0>:17>>
    available_data []

This side effect does not happen when awaiting a coroutine, but coroutine are not as flexible as tasks. It happens when awaiting a `Future`, a `Task`, or any function like `asyncio.wait`, `asyncio.wait_for` or `asyncio.gather` (which all inherit from or use `Future`). There is then no way to do anything equivalent to:

    stop_future = asyncio.Future()
    async def wrapped_consumer2():
        task = asyncio.ensure_future(consume_data())
        try:
            await asyncio.wait([task, stop_future])
        finally:
            task.cancel()
            await asyncio.wait([task])
        if not task.cancelled():
            return task.result()
        else:
            raise RuntimeError('stopped')

This is due to the Future calling the callback asynchronously:
https://github.com/python/cpython/blob/3.6/Lib/asyncio/futures.py#L214

        for callback in callbacks:
            self._loop.call_soon(callback, self)

I propose to create synchronous versions of those, or a `synchronous_callback` parameter, that turns the callbacks of `Future` synchronous. I've experimented a simple librairy `syncio` with CPython 3.6 to do this (it is harder to patch later versions due to the massive use of private methods).
Basically, needs to:
1) replace the `Future._schedule_callbacks` method by a synchronous version
2) fix `Task._step` to not fail when cleaning `_current_tasks` (https://github.com/python/cpython/blob/3.6/Lib/asyncio/tasks.py#L245)
3) rewrite all the functions to use synchronous futures instead of normal ones

With that librairy, the previous functions are possible and intuitive

    import syncio

    async def wrapped_consumer():
        task = syncio.ensure_sync_future(consume_data())
        return await task

    stop_future = asyncio.Future()
    async def wrapped_consumer2():
        task = syncio.ensure_sync_future(consume_data())
        try:
            await syncio.sync_wait([task, stop_future])
        finally:
            task.cancel()
            await asyncio.wait([task])
        if not task.cancelled():
            return task.result()
        else:
            raise RuntimeError('stopped')

No need to use `syncio` anywhere else in the code, which makes it totally transparent for the end user.

This "library" can be found here: https://github.com/aure-olli/aiokafka/blob/216b4ad3b4115bc9fa463e44fe23636bd63c5da4/syncio.py
It implements `SyncFuture`, `SyncTask`, `ensure_sync_future`, `sync_wait`, `sync_wait_for`, `sync_gather` and `sync_shield`. It works with CPython 3.6 only.

To conclude:
- asynchronous callbacks are preferrable in most case, but do not provide a coherent cancelled status in specific cases
- implementing a version with synchronous callback (or a `synchronous_callback` parameter) is rather easy (however step 2 need to be clarified, probably a cleaner way to fix this)
- it is totally transparent for the end user, as synchronous callback are totally compatible with asynchronous ones
_______________________________________________
Python-ideas mailing list -- python-ideas@python.org
To unsubscribe send an email to python-ideas-leave@python.org
https://mail.python.org/mailman3/lists/python-ideas.python.org/
Message archived at https://mail.python.org/archives/list/python-ideas@python.org/message/E77XF3K4SRYFG3F6QLFBPP5YOB726DNS/
Code of Conduct: http://python.org/psf/codeofconduct/


--
--Guido van Rossum (python.org/~guido)
Pronouns: he/him/his (why is my pronoun here?)