[Python-ideas] The async API of the future: yield-from
Greg Ewing
greg.ewing at canterbury.ac.nz
Mon Oct 15 01:49:49 CEST 2012
Guido van Rossum wrote:
> On Fri, Oct 12, 2012 at 10:05 PM, Greg Ewing
> <greg.ewing at canterbury.ac.nz> wrote:
>>You could go further and say that yielding a tuple of generators
>>means to spawn them all concurrently, wait for them all to
>>complete and send back a tuple of the results. The yield-from
>>code would then look pretty much the same as the futures code.
>
> Sadly it looks that
>
> r = yield from (f1(), f2())
>
> ends up interpreting the tuple as the iterator,
That's not yielding a tuple of generators. This is:
r = yield (f1(), f2())
Note the absence of 'from'.
> So, can par() be as simple as
>
> def par(*args):
> results = []
> for task in args:
> result = yield from task
> results.append(result)
> return results
No, it can't be as simple as that, because that will just
execute the tasks sequentially. It would have to be something
like this:
def par(*tasks):
n = len(tasks)
results = [None] * n
for i, task in enumerate(tasks):
def thunk():
nonlocal n
results[i] = yield from task
n -= 1
scheduler.schedule(thunk)
while n > 0:
yield
return results
Not exactly straightforward, but that's why we write it once
and put it in the library. :-)
> Of course there's the question of what to do when one of the tasks
> raises an error -- I haven't quite figured that out in NDB either, it
> runs all the tasks to completion but the caller only sees the first
> exception. I briefly considered having an "multi-exception" but it
> felt too weird -- though I'm not married to that decision.
Hmmm. Probably what should happen is that all the other tasks
get cancelled and then the exception gets propagated to the
caller of par(). If we assume another couple of primitives:
scheduler.cancel(task) -- cancels the task
scheduler.throw(task, exc) -- raises an exception in the task
then we could implement it this way:
def par(*tasks):
n = len(tasks)
results = [None] * n
this = scheduler.current_task
for i, task in enumerate(tasks):
def thunk():
nonlocal n
try:
results[i] = yield from task
except BaseException as e:
for t in tasks:
scheduler.cancel(t)
scheduler.throw(this, e)
n -= 1
scheduler.schedule(thunk)
while n > 0:
yield
return results
>>>(10) Registering additional callbacks
While we're at it:
class task_with_callbacks():
def __init__(self, task):
self.task = task
self.callbacks = []
def add_callback(self, cb):
self.callbacks.append(cb)
def run(self):
result = yield from self.task
for cb in self.callbacks:
cb()
return result
> Here's another pattern that I can't quite figure out. ...
> Essentially, it's a barrier pattern where multiple tasks (each
> representing a different HTTP request, and thus not all starting at
> the same time) render a partial web page and then block until a new
> HTTP request comes in that provides the missing info.
This should be fairly straightforward.
waiters = [] # Tasks waiting for the event
When a task wants to wait:
scheduler.block(waiters)
When the event occurs:
for t in waiters:
scheduler.schedule(t)
del waiters[:]
Incidentally, this is a commonly encountered pattern known as a
"condition queue" in IPC parlance. I envisage that the async
library would provide encapsulations of this and other standard
IPC mechanisms such as mutexes, semaphores, channels, etc.
--
Greg
More information about the Python-ideas
mailing list