[Python-ideas] The async API of the future: yield-from
Nick Coghlan
ncoghlan at gmail.com
Tue Oct 16 14:08:15 CEST 2012
On Tue, Oct 16, 2012 at 7:43 PM, Antoine Pitrou <solipsis at pitrou.net> wrote:
> Your example looks rather confusing to me. There are a couple of things
> I don't understand:
>
> - why does load_url_async return something instead of yielding it?
It yields *and* returns, that's the way Guido's API works (as I understand it).
However, some of the other stuff was just plain mistakes in my example.
Background (again, as I understand it, and I'm sure Guido will correct
me if I'm wrong. So, if you think this sounds crazy, *please wait
until Guido clarifies* before worrying too much about it):
- the "@task" decorator is the part that knows how to interface
generators with the event loop (just as @contextmanager adapts between
generators and with statements). I believe it handles these things:
- when you call it, it creates the generator object and calls
next() to advance it to the first yield point
- this initial call returns a Future that will fire only when the
entire *task* is complete
- if a Future is yielded by the underlying generator, the task
wrapper adds the appropriate callback to ensure results are pushed
back into the underlying generator on completion of the associated
operation
- when one of these callbacks fires, the generator is advanced and
a yielded Future is processed in the same fashion
- if at any point the generator finishes instead of yielding
another Future, then the callback will call the appropriate
notification method on the originally *returned* Future
- yielding anything other than a Future from a tasklet is not permitted
- it's the IO operations themselves that know how to kick off
operations and register the appropriate callbacks with the event loop
to get the Future to be triggered
- The Future object API is documented in concurrent.futures:
http://docs.python.org/py3k/library/concurrent.futures#future-objects
I've now posted this example as a gist
(https://gist.github.com/3898874), so it should be a easier to read
over there. However, I've included it inline below as well.
- This first part in my example is a helper function to wait for any
one of a set of Futures to be signalled and help keep track of which
ones we're still waiting for
def _wait_first(futures):
# futures must be a set as items will be removed as they complete
# we create a signalling future to return to our caller. We will copy
# the result of the first future to complete to this signalling future
signal = Future()
def copy_result(completed):
# We ignore every callback after the first one
if signal.done():
return
# Keep track of which ones have been processed across multiple calls
futures.remove(completed)
# It would be nice if we could also remove our callback
from all the other futures at
# this point, but the Future API doesn't currently allow that
# Now we pass the result of this future through to our
signalling future
if completed.cancelled():
signal.cancel()
signal.set_running_or_notify_cancel()
else:
try:
result = completed.result()
except Exception as exc:
signal.set_exception(exc)
else:
signal.set_result(result)
# Here we hook our signalling future up to all our actual operations
# If any of them are already complete, then the callback will
fire immediately
# and we're OK with that
for f in futures:
f.add_done_callback(copy_result)
# And, for our signalling future to be useful, the caller must
be able to access it
return signal
- This is just a public version of the above helper that works with
arbitrary iterables:
def wait_first(futures):
# Helper needs a real set, so we give it one
# Also makes sure all operations start immediately when passed
a generator
return _wait_first(set(futures))
- This is the API I'm most interested in, as it's the async equivalent
of http://docs.python.org/py3k/library/concurrent.futures#concurrent.futures.as_completed,
which powers this URL retrieval example:
http://docs.python.org/py3k/library/concurrent.futures#threadpoolexecutor-example
# Note that this is an *ordinary iterator*, not a tasklet
def as_completed(futures):
# We ensure all the operations have started, and get ourselves
a set to work with
remaining = set(futures)
while remaining:
# The trick here is that we *don't yield the original
futures directly*
# Instead, we yield
yield _wait_first(remaining)
And now a more complete, heavily commented, version of the example:
# First, a tasklet for loading a single page
@task
def load_url_async(url)
# The async URL open operation does three things:
# 1. kicks off the connection process
# 2. registers a callback with the event handler that will signal
a Future object when IO is complete
# 3. returns the future object
# We then *yield* the Future object, at which point the task
decorator takes over and registers a callback
# with the *Future* object to resume this generator with the
*result* that was passed to the Future object
conn = yield urllib.urlopen_async(url)
# We assume "conn.read()" is defined in such a way that it allows
both "read everything at once" usage *and* a
# usage where you read the individual bits of data as they arrive like this:
# for wait_for_chunk in conn.read():
# chunk = yield wait_for_chunk
# The secret is that conn.read() would be an *ordinary generator*
in that case rather than a tasklet.
# You could also do a version that *only* supported complete
reads, in which case the "from" wouldn't be needed
data = yield from conn.read()
# We return both the URL *and* the data, so our caller doesn't
have to keep track of which url the data is for
return url, data
# And now the payoff: defining a tasklet to read a bunch of URLs in
parallel, processing them in the order of loading rather than the
order of requesting them or having to wait until the slowest load
completes before doing anything
@task
def example(urls):
# We define the tasks we want to run based on the given URLs
# This results in an iterable of Future objects that will fire when
# the associated page has been read completely
tasks = (load_url_async(url) for url in urls)
# And now we use our helper iterable to run things in parallel
# and get access to the results as they complete
for wait_for_page in as_completed(tasks):
try:
url, data = yield wait_for_page
except Exception as exc:
print("Something broke for {!r} ({}: {})".format(url,
type(exc), exc))
else:
print("Loaded {} bytes from {!r}".format(len(data), url))
# The real kicker here? Replace "yield wait_for_page" with
"wait_for_page.result()" and you have the equivalent
concurrent.futures code.
Cheers,
Nick.
--
Nick Coghlan | ncoghlan at gmail.com | Brisbane, Australia
More information about the Python-ideas
mailing list