[Python-ideas] The async API of the future: yield-from

Piet Delport pjdelport at gmail.com
Mon Oct 15 08:59:12 CEST 2012

On Mon, Oct 15, 2012 at 1:49 AM, Greg Ewing <greg.ewing at canterbury.ac.nz> wrote:
> No, it can't be as simple as that, because that will just
> execute the tasks sequentially. It would have to be something
> like this:
>    def par(*tasks):
>       n = len(tasks)
>       results = [None] * n
>       for i, task in enumerate(tasks):
>          def thunk():
>             nonlocal n
>             results[i] = yield from task
>             n -= 1
>          scheduler.schedule(thunk)
>       while n > 0:
>          yield
>       return results
> Not exactly straightforward, but that's why we write it once
> and put it in the library. :-)

There are two problems with this code. :)

The first is a scoping gotcha: every copy of thunk() will attempt run
the same task, and assign it to the same index, due to them sharing the
"i" and "task" variables. (The closure captures a reference to the outer
variable cells, rather than a copy of their values at the time of
thunk's definition.)

This could be fixed by defining it as "def thunk(i=i, task=task)", to
capture copies.

The second problem is more serious: the final while loop busy-waits,
which will consume all spare CPU time waiting for the underlying tasks
to complete. For this to be practical, it must suspend and resume itself
more efficiently.

Here's my own attempt. I'll assume the following primitive scheduler
instructions (see my "generator task protocol" mail for context), but it
should be readily adaptable to other primitives:

1. yield tasklib.spawn(task()) instructs the scheduler to spawn a new,
   independent task.
2. yield tasklib.suspend() suspends the current task.
3. yield tasklib.get_resume() obtains a callable / primitive that can be
   used to resume the current task later.

I'll also expand it to keep track of success and failure by returning a
list of (flag, result) tuples, in the style of DeferredList[1].


    def par(*tasks):
        resume = yield tasklib.get_resume()

        # Prepare to hold the results, and keep track of progress.
        results = [None] * len(tasks)
        finished = 0

        # Gather the i'th task's result
        def gather(i, task):
            nonlocal finished
                r = yield from task
            except Exception as e:
                results[i] = (False, e)
                results[i] = (True, r)
            finished += 1

            # If we're the last to complete, resume par()
            if finished == len(tasks):
                yield resume()

        # Spawn subtasks, and wait for completion.
        for (i, task) in tasks:
            yield tasklib.spawn(gather(i, task))
        yield tasklib.suspend()

        return results

Hopefully, this is easy enough to read: it should be obvious to see how
to modify gather() to add support for resuming immediately on the first
result or first error.

[1] http://twistedmatrix.com/documents/12.1.0/api/twisted.internet.defer.DeferredList.html

Piet Delport

More information about the Python-ideas mailing list