On Mon, Oct 15, 2012 at 1:49 AM, Greg Ewing
No, it can't be as simple as that, because that will just execute the tasks sequentially. It would have to be something like this:
def par(*tasks): n = len(tasks) results = [None] * n for i, task in enumerate(tasks): def thunk(): nonlocal n results[i] = yield from task n -= 1 scheduler.schedule(thunk) while n > 0: yield return results
Not exactly straightforward, but that's why we write it once and put it in the library. :-)
There are two problems with this code. :) The first is a scoping gotcha: every copy of thunk() will attempt run the same task, and assign it to the same index, due to them sharing the "i" and "task" variables. (The closure captures a reference to the outer variable cells, rather than a copy of their values at the time of thunk's definition.) This could be fixed by defining it as "def thunk(i=i, task=task)", to capture copies. The second problem is more serious: the final while loop busy-waits, which will consume all spare CPU time waiting for the underlying tasks to complete. For this to be practical, it must suspend and resume itself more efficiently. Here's my own attempt. I'll assume the following primitive scheduler instructions (see my "generator task protocol" mail for context), but it should be readily adaptable to other primitives: 1. yield tasklib.spawn(task()) instructs the scheduler to spawn a new, independent task. 2. yield tasklib.suspend() suspends the current task. 3. yield tasklib.get_resume() obtains a callable / primitive that can be used to resume the current task later. I'll also expand it to keep track of success and failure by returning a list of (flag, result) tuples, in the style of DeferredList[1]. Code: def par(*tasks): resume = yield tasklib.get_resume() # Prepare to hold the results, and keep track of progress. results = [None] * len(tasks) finished = 0 # Gather the i'th task's result def gather(i, task): nonlocal finished try: r = yield from task except Exception as e: results[i] = (False, e) else: results[i] = (True, r) finished += 1 # If we're the last to complete, resume par() if finished == len(tasks): yield resume() # Spawn subtasks, and wait for completion. for (i, task) in tasks: yield tasklib.spawn(gather(i, task)) yield tasklib.suspend() return results Hopefully, this is easy enough to read: it should be obvious to see how to modify gather() to add support for resuming immediately on the first result or first error. [1] http://twistedmatrix.com/documents/12.1.0/api/twisted.internet.defer.Deferre... -- Piet Delport