Guido van Rossum wrote:
On Fri, Oct 12, 2012 at 10:05 PM, Greg Ewing
wrote:
You could go further and say that yielding a tuple of generators means to spawn them all concurrently, wait for them all to complete and send back a tuple of the results. The yield-from code would then look pretty much the same as the futures code.
Sadly it looks that
r = yield from (f1(), f2())
ends up interpreting the tuple as the iterator,
That's not yielding a tuple of generators. This is: r = yield (f1(), f2()) Note the absence of 'from'.
So, can par() be as simple as
def par(*args): results = [] for task in args: result = yield from task results.append(result) return results
No, it can't be as simple as that, because that will just execute the tasks sequentially. It would have to be something like this: def par(*tasks): n = len(tasks) results = [None] * n for i, task in enumerate(tasks): def thunk(): nonlocal n results[i] = yield from task n -= 1 scheduler.schedule(thunk) while n > 0: yield return results Not exactly straightforward, but that's why we write it once and put it in the library. :-)
Of course there's the question of what to do when one of the tasks raises an error -- I haven't quite figured that out in NDB either, it runs all the tasks to completion but the caller only sees the first exception. I briefly considered having an "multi-exception" but it felt too weird -- though I'm not married to that decision.
Hmmm. Probably what should happen is that all the other tasks get cancelled and then the exception gets propagated to the caller of par(). If we assume another couple of primitives: scheduler.cancel(task) -- cancels the task scheduler.throw(task, exc) -- raises an exception in the task then we could implement it this way: def par(*tasks): n = len(tasks) results = [None] * n this = scheduler.current_task for i, task in enumerate(tasks): def thunk(): nonlocal n try: results[i] = yield from task except BaseException as e: for t in tasks: scheduler.cancel(t) scheduler.throw(this, e) n -= 1 scheduler.schedule(thunk) while n > 0: yield return results
(10) Registering additional callbacks
While we're at it: class task_with_callbacks(): def __init__(self, task): self.task = task self.callbacks = [] def add_callback(self, cb): self.callbacks.append(cb) def run(self): result = yield from self.task for cb in self.callbacks: cb() return result
Here's another pattern that I can't quite figure out. ... Essentially, it's a barrier pattern where multiple tasks (each representing a different HTTP request, and thus not all starting at the same time) render a partial web page and then block until a new HTTP request comes in that provides the missing info.
This should be fairly straightforward. waiters = [] # Tasks waiting for the event When a task wants to wait: scheduler.block(waiters) When the event occurs: for t in waiters: scheduler.schedule(t) del waiters[:] Incidentally, this is a commonly encountered pattern known as a "condition queue" in IPC parlance. I envisage that the async library would provide encapsulations of this and other standard IPC mechanisms such as mutexes, semaphores, channels, etc. -- Greg