On Sun, Oct 14, 2012 at 4:49 PM, Greg Ewing
Guido van Rossum wrote:
On Fri, Oct 12, 2012 at 10:05 PM, Greg Ewing
wrote: You could go further and say that yielding a tuple of generators means to spawn them all concurrently, wait for them all to complete and send back a tuple of the results. The yield-from code would then look pretty much the same as the futures code.
Sadly it looks that
r = yield from (f1(), f2())
ends up interpreting the tuple as the iterator,
That's not yielding a tuple of generators. This is:
r = yield (f1(), f2())
Note the absence of 'from'.
That's what I meant -- excuse me for not writing "yield-fromming". :-)
So, can par() be as simple as
def par(*args): results = [] for task in args: result = yield from task results.append(result) return results
No, it can't be as simple as that, because that will just execute the tasks sequentially.
Yeah, Ben just cleared that up for me.
It would have to be something like this:
def par(*tasks): n = len(tasks) results = [None] * n for i, task in enumerate(tasks): def thunk(): nonlocal n results[i] = yield from task n -= 1 scheduler.schedule(thunk) while n > 0: yield return results
Not exactly straightforward, but that's why we write it once and put it in the library. :-)
But, as Christian Tismer wrote, we need to have some kind of idea of what the primitives are that we want to support. Or should we just have async equivalents for everything in threading.py and queue.py? (What about thread-local? Do we need task-local? Shudder.)
Of course there's the question of what to do when one of the tasks raises an error -- I haven't quite figured that out in NDB either, it runs all the tasks to completion but the caller only sees the first exception. I briefly considered having an "multi-exception" but it felt too weird -- though I'm not married to that decision.
Hmmm. Probably what should happen is that all the other tasks get cancelled and then the exception gets propagated to the caller of par().
I think it ought to be at least an option to run them all to completion -- I can easily imagine use cases for that. Also for wanting to receive a list of exceptions. A practical par() may have to grow a few options...
If we assume another couple of primitives:
scheduler.cancel(task) -- cancels the task
scheduler.throw(task, exc) -- raises an exception in the task
then we could implement it this way:
def par(*tasks): n = len(tasks) results = [None] * n this = scheduler.current_task for i, task in enumerate(tasks): def thunk(): nonlocal n try: results[i] = yield from task except BaseException as e: for t in tasks: scheduler.cancel(t) scheduler.throw(this, e) n -= 1 scheduler.schedule(thunk) while n > 0: yield return results
I glazed over here but I trust you.
(10) Registering additional callbacks
While we're at it:
class task_with_callbacks():
def __init__(self, task): self.task = task self.callbacks = []
def add_callback(self, cb): self.callbacks.append(cb)
def run(self): result = yield from self.task for cb in self.callbacks: cb() return result
Nice. (In fact so simple that maybe users can craft this for themselves?)
Here's another pattern that I can't quite figure out. ...
Essentially, it's a barrier pattern where multiple tasks (each representing a different HTTP request, and thus not all starting at the same time) render a partial web page and then block until a new HTTP request comes in that provides the missing info.
This should be fairly straightforward.
waiters = [] # Tasks waiting for the event
When a task wants to wait:
scheduler.block(waiters)
When the event occurs:
for t in waiters: scheduler.schedule(t) del waiters[:]
Incidentally, this is a commonly encountered pattern known as a "condition queue" in IPC parlance. I envisage that the async library would provide encapsulations of this and other standard IPC mechanisms such as mutexes, semaphores, channels, etc.
Maybe you meant condition variable? It looks like threading.Condition with notify_all(). Anyway, I agree we need some primitives like these, but I'm not sure how to choose the set of essentials. -- --Guido van Rossum (python.org/~guido)