On Mon, Oct 15, 2012 at 10:35 AM, Guido van Rossum guido@python.org wrote:
But, as Christian Tismer wrote, we need to have some kind of idea of what the primitives are that we want to support. Or should we just have async equivalents for everything in threading.py and queue.py? (What about thread-local? Do we need task-local? Shudder.)
Task locals aren't so scary, since they're already the main reason why generators are so handy - task locals are just the frame locals in the generator :)
The main primitive I personally want out of an async API is a task-based equivalent to concurrent.futures.as_completed() [1]. This is what I meant about iteration being a bit of a mess: the way the as_completed() works, the suspend/resume channel of the iterator protocol is being used to pass completed future objects back to the calling iterator. That means that channel *can't* be used to talk between the coroutine and the scheduler, so if you decide you need to free it up for that purpose, you're either forced to wait for *all* the futures to be triggered before any of them can be passed to the caller (allowing you to use yield-from and return a container of completed futures) or else you're forced to switch to callback-style programming (this is where Ruby's blocks are a huge advantage - because their for loops essentially *are* callbacks, you have a lot more flexibility in calling back to different places from a single piece of code).
However, I can see one why to make it work which is to require the *invoking* code to continue to manage the communication with the scheduler. Using this concept, there would be an "as_completed_async()" primitive that works something like:
for get_next_result in as_completed_task(tasks): task, result = yield get_next_result # Process this result, wait for next one
The async equivalent of the concurrent.futures example would then look something like:
URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/']
@task def load_url_async(url, timeout): with (yield urlopen_async(url, timeout=timeout)) as handle: return url, handle.read()
tasks = (load_url_async(url, 60) for url in URLS) with concurrent.futures.as_completed_async(tasks) as async_results for get_next_result in async_results: try: url, data = yield get_next_result except Exception as exc: print('{!r} generated an exception: {}'.format(url, exc)) else: print('{!r} page is {:d} bytes'.format(url, len(data)))
Key parts of this idea:
1. as_completed_async registers the supplied tasks with the main scheduler so they can all start running in parallel 2. as_completed_async is a context manager that will *cancel* all pending jobs on exit 3. as_completed_async is an iterator that produces a special future that fires whenever *any* of the registered tasks has run to completion 4. because there's a separate yield step for each result retrieval, ordinary exception handling mechanisms can be used rather than needing to introspect a future object
Cheers, Nick.
[1] http://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecuto...