[Twisted-Python] more thoughts on resumable async data flows
Howdy. I've made quite a few changes to flow.py and it's quickly approaching maturity. To summarize, flow.py is a way to express sequential, streaming data flows that are interruptable as a collection of small, atomic operations. This is needed since sometimes an operation must block, flow.py takes care of resuming your flow at a later time. It's trivial, but here is an example... def printResult(data): print data def addOne(data): return data+1 def finished(): print "finished" def dataSource(data): return [1, 1+data, 1+data*2] a = Flow() a.addBranch(dataSource, finished) a.addCallable(addOne) a.addCallable(printResult) a.execute(2) a.execute(8) The above code constructs a flow, starting with a 'branch' operation that generates a series of events, in this case, the lists [1,3,5] and then in a second execution, [1,9,17]. For each one of these events, a function addOne is called on them, and then the output of that function is directed to be printed. This mechanism works with generators... def simpleGenerator(data): for x in range(data): yield x b = Flow() b.addBranch(simpleGenerator) b.addCallable(printResult) b.execute(5) While this may not be all that useful, what Flow brings to the table is the ability to PauseFlow within a processing stage, for example class simpleIterator: def __init__(self, data): self.data = data def __iter__(self): return self def next(self): print "." if self.data < 0: raise StopIteration ret = self.data self.data -= 1 # # imagine a blocking operation here... sometime if ret % 2: raise PauseFlow # <= Goes to main event loop return ret c = Flow() c.addBranch(simpleIterator) c.addCallable(printResult) c.execute(5) The above code produces 4, 2, 0 ; while this may not be all that interesting, if there were N stages above this code, they would all be resumed properly. By tossing PauseFlow, the entire Flow event loop is stopped, and a reactor.callLater(0, loop-again) is called; giving other events in the queue to work. Thus, the flow construct provides a way to not only manage a bunch of useful code snippets into a process; but more importantly gives a way that the entire flow can be interruped and then resumed later when data arrives. Anyway, it's in the sandbox if anyone wants to play... Clark P.S. Unfortunately, it looks like PauseFlow doesn't work in the context of a generator... pity. Hopefully I'm doing something wrong.
participants (3)
-
Clark C. Evans
-
Matthias Urlichs
-
Philippe Lafoucrière