In asyncio, when a task awaits for another task (or future), it can be cancelled right after the awaited task finished. Thus, if the awaited task has consumed data, the data is lost.

For instance, with the following code:

    import asyncio

    available_data = []
    data_ready = asyncio.Future()

    def feed_data(data):
        global data_ready
        available_data.append(data)
        data_ready.set_result(None)
        data_ready = asyncio.Future()

    async def consume_data():
        while not available_data:
            await asyncio.shield(data_ready)
        return available_data.pop()

    async def wrapped_consumer():
        task = asyncio.ensure_future(consume_data())
        return await task

If I perform those exact steps:

    async def test():
        task = asyncio.ensure_future(wrapped_consumer())
        await asyncio.sleep(0)
        feed_data('data')
        await asyncio.sleep(0)
        task.cancel()
        await asyncio.sleep(0)
        print ('task', task)
        print ('available_data', available_data)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

Then I can see that the task has been cancelled despite the data being consumed. Since the result of `wrapped_consumer` cannot be retrieved, the data is forever lost.

    task <Task cancelled coro=<wrapped_consumer() done, defined at <ipython-input-1-de4ad193b1d0>:17>>
    available_data []

This side effect does not happen when awaiting a coroutine, but coroutines are not as flexible as tasks (unless manipulated as a generator). It happens when awaiting a `Future`, a `Task`, or any function like `asyncio.wait`, `asyncio.wait_for` or `asyncio.gather` (which all inherit from or use `Future`). There is then no way to do anything equivalent to:

    stop_future = asyncio.Future()
    async def wrapped_consumer2():
        task = asyncio.ensure_future(consume_data())
        try:
            await asyncio.wait([task, stop_future])
        finally:
            task.cancel()
        if not task.cancelled():
            return task.result()
        else:
            raise RuntimeError('stopped')

This is due to the Future calling the callback asynchronously:
https://github.com/python/cpython/blob/3.6/Lib/asyncio/futures.py#L214

        for callback in callbacks:
            self._loop.call_soon(callback, self)

I propose to create synchronous versions of those, or a `synchronous_callback` parameter, that turns the callbacks of `Future` synchronous. I've experimented a simple library `syncio` with CPython 3.6 to do this (it is harder to patch later versions due to the massive use of private methods).
Basically, needs to:
1) replace the `Future._schedule_callbacks` method by a synchronous version
2) fix `Task._step` to not fail when cleaning `_current_tasks` (https://github.com/python/cpython/blob/3.6/Lib/asyncio/tasks.py#L245)
3) rewrite all the functions to use synchronous futures instead of normal ones

With that library, the previous functions are possible and intuitive

    import syncio
   
    async def wrapped_consumer():
        task = syncio.ensure_sync_future(consume_data())
        return await task

    stop_future = asyncio.Future()
    async def wrapped_consumer2():
        task = syncio.ensure_sync_future(consume_data())
        try:
            await syncio.sync_wait([task, stop_future])
        finally:
            task.cancel()
        if not task.cancelled():
            return task.result()
        else:
            raise RuntimeError('stopped')

No need to use `syncio` anywhere else in the code, which makes it totally transparent for the end user. `wrapped_consumer` and `wrapped_consumer2` are now cancelled if and only if the data hasn't been consumed, whatever is the order of the steps (and the presence of `asyncio.sleep`).

This "library" can be found here: https://github.com/aure-olli/aiokafka/blob/3acb88d6ece4502a78e230b234f47b90b9d30fd5/syncio.py
It implements `SyncFuture`, `SyncTask`, `ensure_sync_future`, `sync_wait`, `sync_wait_for`, `sync_gather` and `sync_shield`. It works with CPython 3.6.

To conclude:
- asynchronous callbacks are preferable in most cases, but do not provide a coherent cancelled status in specific cases
- implementing a version with synchronous callback (or a `synchronous_callback` parameter) is rather easy (however step 2 need to be clarified, probably a cleaner way to fix this)
- it is totally transparent for the end user, as synchronous callbacks are totally compatible with asynchronous ones