On Tue, 16 Oct 2012 17:48:24 +1000
Nick Coghlan
def _wait_first(futures): # futures must be a set, items will be removed as they complete signal = Future() def chain_result(completed): futures.remove(completed) if completed.cancelled(): signal.cancel() signal.set_running_or_notify_cancel() elif completed.done(): signal.set_result(completed.result()) else: signal.set_exception(completed.exception()) for f in futures: f.add_done_callback(chain_result) return signal
def wait_first(futures): return _wait_first(set(futures))
def as_completed(futures): remaining = set(futures) while 1: if not remaining: break yield _wait_first(remaining)
@task def load_url_async(url) return url, (yield urllib.urlopen_async(url)).read()
@task def example(urls): for get_next_page in as_completed(load_url_async(url) for url in urls): try: url, data = yield get_next_page except Exception as exc: print("Something broke: {}".format(exc)) else: print("Loaded {} bytes from {!r}".format(len(data), url))
Your example looks rather confusing to me. There are a couple of things I don't understand: - why does load_url_async return something instead of yielding it? - how does overlapping of reads happen? you seem to consider that a read() will be non-blocking once the server starts responding to your request, which is only true if the response is small (or you have a very fast connection to the server). - if read() is really non-blocking, why do you yield get_next_page? What does that achieve? Actually, what is yielding a tuple supposed to achieve at all? - where is control transferred over to the scheduler? it seems it's only in get_next_page, while I would expect it to be transferred in as_completed as well. Regards Antoine.