[Python-ideas] The async API of the future: yield-from

Nick Coghlan ncoghlan at gmail.com
Mon Oct 15 10:18:13 CEST 2012


On Mon, Oct 15, 2012 at 10:35 AM, Guido van Rossum <guido at python.org> wrote:
> But, as Christian Tismer wrote, we need to have some kind of idea of
> what the primitives are that we want to support. Or should we just
> have async equivalents for everything in threading.py and queue.py?
> (What about thread-local? Do we need task-local? Shudder.)

Task locals aren't so scary, since they're already the main reason why
generators are so handy - task locals are just the frame locals in the
generator :)

The main primitive I personally want out of an async API is a
task-based equivalent to concurrent.futures.as_completed() [1]. This
is what I meant about iteration being a bit of a mess: the way the
as_completed() works, the suspend/resume channel of the iterator
protocol is being used to pass completed future objects back to the
calling iterator. That means that channel *can't* be used to talk
between the coroutine and the scheduler, so if you decide you need to
free it up for that purpose, you're either forced to wait for *all*
the futures to be triggered before any of them can be passed to the
caller (allowing you to use yield-from and return a container of
completed futures) or else you're forced to switch to callback-style
programming (this is where Ruby's blocks are a huge advantage -
because their for loops essentially *are* callbacks, you have a lot
more flexibility in calling back to different places from a single
piece of code).

However, I can see one why to make it work which is to require the
*invoking* code to continue to manage the communication with the
scheduler. Using this concept, there would be an
"as_completed_async()" primitive that works something like:

    for get_next_result in as_completed_task(tasks):
        task, result = yield get_next_result
        # Process this result, wait for next one

The async equivalent of the concurrent.futures example would then look
something like:

    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://some-made-up-domain.com/']

    @task
    def load_url_async(url, timeout):
        with (yield urlopen_async(url, timeout=timeout)) as handle:
            return url, handle.read()

    tasks = (load_url_async(url, 60) for url in URLS)
    with concurrent.futures.as_completed_async(tasks) as async_results
        for get_next_result in async_results:
            try:
                url, data = yield get_next_result
            except Exception as exc:
                print('{!r} generated an exception: {}'.format(url, exc))
            else:
                print('{!r} page is {:d} bytes'.format(url, len(data)))

Key parts of this idea:

1. as_completed_async registers the supplied tasks with the main
scheduler so they can all start running in parallel
2. as_completed_async is a context manager that will *cancel* all
pending jobs on exit
3. as_completed_async is an iterator that produces a special future
that fires whenever *any* of the registered tasks has run to
completion
4. because there's a separate yield step for each result retrieval,
ordinary exception handling mechanisms can be used rather than needing
to introspect a future object

Cheers,
Nick.

[1] http://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor-example

-- 
Nick Coghlan   |   ncoghlan at gmail.com   |   Brisbane, Australia



More information about the Python-ideas mailing list