[Twisted-Python] more thoughts on resumable async data flows
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
Howdy. I've made quite a few changes to flow.py and it's quickly approaching maturity. To summarize, flow.py is a way to express sequential, streaming data flows that are interruptable as a collection of small, atomic operations. This is needed since sometimes an operation must block, flow.py takes care of resuming your flow at a later time. It's trivial, but here is an example... def printResult(data): print data def addOne(data): return data+1 def finished(): print "finished" def dataSource(data): return [1, 1+data, 1+data*2] a = Flow() a.addBranch(dataSource, finished) a.addCallable(addOne) a.addCallable(printResult) a.execute(2) a.execute(8) The above code constructs a flow, starting with a 'branch' operation that generates a series of events, in this case, the lists [1,3,5] and then in a second execution, [1,9,17]. For each one of these events, a function addOne is called on them, and then the output of that function is directed to be printed. This mechanism works with generators... def simpleGenerator(data): for x in range(data): yield x b = Flow() b.addBranch(simpleGenerator) b.addCallable(printResult) b.execute(5) While this may not be all that useful, what Flow brings to the table is the ability to PauseFlow within a processing stage, for example class simpleIterator: def __init__(self, data): self.data = data def __iter__(self): return self def next(self): print "." if self.data < 0: raise StopIteration ret = self.data self.data -= 1 # # imagine a blocking operation here... sometime if ret % 2: raise PauseFlow # <= Goes to main event loop return ret c = Flow() c.addBranch(simpleIterator) c.addCallable(printResult) c.execute(5) The above code produces 4, 2, 0 ; while this may not be all that interesting, if there were N stages above this code, they would all be resumed properly. By tossing PauseFlow, the entire Flow event loop is stopped, and a reactor.callLater(0, loop-again) is called; giving other events in the queue to work. Thus, the flow construct provides a way to not only manage a bunch of useful code snippets into a process; but more importantly gives a way that the entire flow can be interruped and then resumed later when data arrives. Anyway, it's in the sandbox if anyone wants to play... Clark P.S. Unfortunately, it looks like PauseFlow doesn't work in the context of a generator... pity. Hopefully I'm doing something wrong.
![](https://secure.gravatar.com/avatar/eb716a395681ce3788bc41d7e599c831.jpg?s=120&d=mm&r=g)
Hi, On Fri, 14 Mar 2003 06:50:03 +0000, Clark C. Evans wrote:
Anyway, it's in the sandbox if anyone wants to play...
Looks nice. I'll probably use that (for processing a rather large amount of database results).
You can't leave a generator with an exception and expect it to be resumable. (Where should it continue?? There's no mechanism to re-enter it at any point other than a yield(), which is not what you want to do!) The best idea would probably be to "yield PauseFlow", and to special-case that in the flow engine. I don't know what your use case is, though; my iterators either don't wait for something (they don't need this), or they block on something (I park them in a different thread), or they wait on a Deferred or whatever (they're not written as generators). -- Matthias
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
Matthias, Since I wrote this, I've been informed of two relevant posts: twistedmatrix.com/pipermail/twisted-python/2002-September/001685.html twistedmatrix.com/pipermail/twisted-python/2003-February/002808.html. Both of these are doing the same sort of thing, only that they are using a 'pull' mechanism rather than a 'push' technique. Anyway, at first I was thinking that the 'pull' approach is better and that the stuff I wrote may not be useful; but upon further rework, I now think that both approaches are somewhat complementary, and perhaps they could both be supported. Also, the current version in the Sandbox has a few problems as far as usage. I'm not sure how to fix them, but another refactor is needed. Specifically, I'm using addFilter about 80% of the time and the mechansim needs to put addFlush in its public interface (I needed it about 3 times when I was re-working some of my application code... ) Best, Clark On Thu, Mar 20, 2003 at 10:46:55AM +0100, Matthias Urlichs wrote: | Hi, | | On Fri, 14 Mar 2003 06:50:03 +0000, Clark C. Evans wrote: | | > Anyway, it's in the sandbox if anyone wants to play... | > | Looks nice. I'll probably use that (for processing a rather large | amount of database results). | | > P.S. Unfortunately, it looks like PauseFlow doesn't | > work in the context of a generator... pity. Hopefully | > I'm doing something wrong. | > | You can't leave a generator with an exception and expect it to be | resumable. (Where should it continue?? There's no mechanism to re-enter it | at any point other than a yield(), which is not what you want to do!) | | The best idea would probably be to "yield PauseFlow", and to special-case | that in the flow engine. I don't know what your use case is, though; my | iterators either don't wait for something (they don't need this), or | they block on something (I park them in a different thread), or they | wait on a Deferred or whatever (they're not written as generators). | | -- | Matthias | | | _______________________________________________ | Twisted-Python mailing list | Twisted-Python@twistedmatrix.com | http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
Clark C. Evans wrote:
how do you use it with adbapi ?? I mean, how do you use callbacks with your flow ? How does it work if a bloc has several inputs ? I want to do this : DS1 ---> FS1 ---+ +---> FS3 DS2 ---> FS2 -------> FS3 Two datasources provide some data. Data are used to run Fuzzy Inference Systems (FS1, FS2, FS3). FS3 must wait until all its inputs are ready. Is it possible with flow.py ? Will it be possible ?
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
On Thu, Apr 03, 2003 at 03:02:04PM +0200, Philippe Lafoucri�re wrote: | how do you use it with adbapi ?? I mean, how do you use callbacks | with your flow ? | | How does it work if a bloc has several inputs ? | I want to do this : | | DS1 ---> FS1 ---+ | +---> FS3 | DS2 ---> FS2 -------> FS3 If your queries are returning small results, ie, non-incremental, I would just set this up as a chain of Deferreds and not bother with flow as it might be overkill for your needs. In other words, in the twisted.enterprise.adbapi when you run a query, it returns a deferred. So DS1 and DS2 would be those query results. Your FS1 and FS2 should be deferred operations as well, but fired via the 'addCallback' on DS1 and DS2 respectively. Then FS3 is a DeferredList taking FS1 and FS2. ... If you can use generators, for more complicated flows, etrepum's (Bob Ippolito's) approach may be better approach to this, although I'm not sure how it would handle more 'fluid' flow through the process. I need to understand what he's doing better. At first I thought his approach is very different than mine, but I think (although I'm not quite yet able to explain) that they are isomorphic; only that I'm using a temp stack and he's using the event queue via a linked-list. ... As for the stuff in flow.py, right now it uses a thread for each database connection. Eventually for PostgreSQL, I'd like to move over to a non-threaded driver. Gerhard was working on such a thing as I remember. Also, it may be renamed since extrepum was using "flow.py" before me. Anyway, the database connection is done through a QueryIterator which basically sends a set of rows (via fetchmany) on to the next stage. So, the connection from DS1 to FS1 and DS2 to FS2 could be done using a simple flow for each. The problem in the flow module has to do with merging the two flows. This can be done by using the 'context'. In short, it would work, but I'm not sure that it's your best option; and I'm still not quite happy with flow.py myself (although it is in production). Clark
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
No, this may be used with a very large amount of data.
I agree with that
How do you fire it exactly ? Moreover, the tutorial about deferred isn't so clear for me (twisted newbie).
Then FS3 is a DeferredList taking FS1 and FS2.
Just a note : this system may be dynamic. I mean that the user can create the "flow" he wants (with various datasources and other blocs).
I don't really know generators. I'm going to take a look at like tomorow...
3 flows ? (1 for DS1 to FS1, 1 for DS2 to FS2, and one the merge the 2 others) I was thinking of using only one flow. It's weird to seperate this process in several flow, isn't it ?
thank you for your help
Clark
Thank very much Clark ! Philippe
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
On Sun, Apr 06, 2003 at 03:39:30PM +0200, Philippe Lafoucri�re wrote: | How do you fire it exactly ? Moreover, the tutorial about deferred isn't so | clear for me (twisted newbie). The one in CVS is a bit better, bu t it still isn't by any means perfect. Basically 'deferred' is just a mechanism for managing callback chains (and errors). | | > Then FS3 is a DeferredList taking FS1 and FS2. | | Just a note : this system may be dynamic. I mean that the user can create | the "flow" he wants (with various datasources and other blocs). *nods* | > As for the stuff in flow.py, right now it uses a thread | > for each database connection. Eventually for PostgreSQL, | > I'd like to move over to a non-threaded driver. Gerhard | > was working on such a thing as I remember. Also, it may | > be renamed since extrepum was using "flow.py" before me. | > | > Anyway, the database connection is done through a | > QueryIterator which basically sends a set of rows | > (via fetchmany) on to the next stage. So, the connection | > from DS1 to FS1 and DS2 to FS2 could be done using | > a simple flow for each. The problem in the flow module | > has to do with merging the two flows. This can be done | > by using the 'context'. | | 3 flows ? (1 for DS1 to FS1, 1 for DS2 to FS2, and one the merge the 2 | others) Yes. In my original version, when you see 'flow' think of a pipeline. It was the first approximation. I'm not totally happy with it... | I was thinking of using only one flow. It's weird to seperate this process | in several flow, isn't it ? Well, your diagram sort of implies a parallel nature, which wasn't part of my original requirements. I have that need now, so flow needs to be updated to respect this. If you want to go over this via phone, and you are in the US or Canada I'd be happy to drop you a line... just send me your phone number and a good time. Best, Clark
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
:-( I am in France near Paris ! but this is my personnal adress : lafou<at>wanadoo<dot>fr and my icq number : 2376706 finally, my msn id : plafoucr@hotmail.com (eark, I hate msn) Thank you again Clark
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
Did you try to contact me ? I 've got a LOT of spam this days, and I am affraid of throwing some "good" mails. thx
![](https://secure.gravatar.com/avatar/eb716a395681ce3788bc41d7e599c831.jpg?s=120&d=mm&r=g)
Hi, On Fri, 14 Mar 2003 06:50:03 +0000, Clark C. Evans wrote:
Anyway, it's in the sandbox if anyone wants to play...
Looks nice. I'll probably use that (for processing a rather large amount of database results).
You can't leave a generator with an exception and expect it to be resumable. (Where should it continue?? There's no mechanism to re-enter it at any point other than a yield(), which is not what you want to do!) The best idea would probably be to "yield PauseFlow", and to special-case that in the flow engine. I don't know what your use case is, though; my iterators either don't wait for something (they don't need this), or they block on something (I park them in a different thread), or they wait on a Deferred or whatever (they're not written as generators). -- Matthias
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
Matthias, Since I wrote this, I've been informed of two relevant posts: twistedmatrix.com/pipermail/twisted-python/2002-September/001685.html twistedmatrix.com/pipermail/twisted-python/2003-February/002808.html. Both of these are doing the same sort of thing, only that they are using a 'pull' mechanism rather than a 'push' technique. Anyway, at first I was thinking that the 'pull' approach is better and that the stuff I wrote may not be useful; but upon further rework, I now think that both approaches are somewhat complementary, and perhaps they could both be supported. Also, the current version in the Sandbox has a few problems as far as usage. I'm not sure how to fix them, but another refactor is needed. Specifically, I'm using addFilter about 80% of the time and the mechansim needs to put addFlush in its public interface (I needed it about 3 times when I was re-working some of my application code... ) Best, Clark On Thu, Mar 20, 2003 at 10:46:55AM +0100, Matthias Urlichs wrote: | Hi, | | On Fri, 14 Mar 2003 06:50:03 +0000, Clark C. Evans wrote: | | > Anyway, it's in the sandbox if anyone wants to play... | > | Looks nice. I'll probably use that (for processing a rather large | amount of database results). | | > P.S. Unfortunately, it looks like PauseFlow doesn't | > work in the context of a generator... pity. Hopefully | > I'm doing something wrong. | > | You can't leave a generator with an exception and expect it to be | resumable. (Where should it continue?? There's no mechanism to re-enter it | at any point other than a yield(), which is not what you want to do!) | | The best idea would probably be to "yield PauseFlow", and to special-case | that in the flow engine. I don't know what your use case is, though; my | iterators either don't wait for something (they don't need this), or | they block on something (I park them in a different thread), or they | wait on a Deferred or whatever (they're not written as generators). | | -- | Matthias | | | _______________________________________________ | Twisted-Python mailing list | Twisted-Python@twistedmatrix.com | http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
Clark C. Evans wrote:
how do you use it with adbapi ?? I mean, how do you use callbacks with your flow ? How does it work if a bloc has several inputs ? I want to do this : DS1 ---> FS1 ---+ +---> FS3 DS2 ---> FS2 -------> FS3 Two datasources provide some data. Data are used to run Fuzzy Inference Systems (FS1, FS2, FS3). FS3 must wait until all its inputs are ready. Is it possible with flow.py ? Will it be possible ?
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
On Thu, Apr 03, 2003 at 03:02:04PM +0200, Philippe Lafoucri�re wrote: | how do you use it with adbapi ?? I mean, how do you use callbacks | with your flow ? | | How does it work if a bloc has several inputs ? | I want to do this : | | DS1 ---> FS1 ---+ | +---> FS3 | DS2 ---> FS2 -------> FS3 If your queries are returning small results, ie, non-incremental, I would just set this up as a chain of Deferreds and not bother with flow as it might be overkill for your needs. In other words, in the twisted.enterprise.adbapi when you run a query, it returns a deferred. So DS1 and DS2 would be those query results. Your FS1 and FS2 should be deferred operations as well, but fired via the 'addCallback' on DS1 and DS2 respectively. Then FS3 is a DeferredList taking FS1 and FS2. ... If you can use generators, for more complicated flows, etrepum's (Bob Ippolito's) approach may be better approach to this, although I'm not sure how it would handle more 'fluid' flow through the process. I need to understand what he's doing better. At first I thought his approach is very different than mine, but I think (although I'm not quite yet able to explain) that they are isomorphic; only that I'm using a temp stack and he's using the event queue via a linked-list. ... As for the stuff in flow.py, right now it uses a thread for each database connection. Eventually for PostgreSQL, I'd like to move over to a non-threaded driver. Gerhard was working on such a thing as I remember. Also, it may be renamed since extrepum was using "flow.py" before me. Anyway, the database connection is done through a QueryIterator which basically sends a set of rows (via fetchmany) on to the next stage. So, the connection from DS1 to FS1 and DS2 to FS2 could be done using a simple flow for each. The problem in the flow module has to do with merging the two flows. This can be done by using the 'context'. In short, it would work, but I'm not sure that it's your best option; and I'm still not quite happy with flow.py myself (although it is in production). Clark
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
No, this may be used with a very large amount of data.
I agree with that
How do you fire it exactly ? Moreover, the tutorial about deferred isn't so clear for me (twisted newbie).
Then FS3 is a DeferredList taking FS1 and FS2.
Just a note : this system may be dynamic. I mean that the user can create the "flow" he wants (with various datasources and other blocs).
I don't really know generators. I'm going to take a look at like tomorow...
3 flows ? (1 for DS1 to FS1, 1 for DS2 to FS2, and one the merge the 2 others) I was thinking of using only one flow. It's weird to seperate this process in several flow, isn't it ?
thank you for your help
Clark
Thank very much Clark ! Philippe
![](https://secure.gravatar.com/avatar/8ca35506ac08cebd833ab53032896c0b.jpg?s=120&d=mm&r=g)
On Sun, Apr 06, 2003 at 03:39:30PM +0200, Philippe Lafoucri�re wrote: | How do you fire it exactly ? Moreover, the tutorial about deferred isn't so | clear for me (twisted newbie). The one in CVS is a bit better, bu t it still isn't by any means perfect. Basically 'deferred' is just a mechanism for managing callback chains (and errors). | | > Then FS3 is a DeferredList taking FS1 and FS2. | | Just a note : this system may be dynamic. I mean that the user can create | the "flow" he wants (with various datasources and other blocs). *nods* | > As for the stuff in flow.py, right now it uses a thread | > for each database connection. Eventually for PostgreSQL, | > I'd like to move over to a non-threaded driver. Gerhard | > was working on such a thing as I remember. Also, it may | > be renamed since extrepum was using "flow.py" before me. | > | > Anyway, the database connection is done through a | > QueryIterator which basically sends a set of rows | > (via fetchmany) on to the next stage. So, the connection | > from DS1 to FS1 and DS2 to FS2 could be done using | > a simple flow for each. The problem in the flow module | > has to do with merging the two flows. This can be done | > by using the 'context'. | | 3 flows ? (1 for DS1 to FS1, 1 for DS2 to FS2, and one the merge the 2 | others) Yes. In my original version, when you see 'flow' think of a pipeline. It was the first approximation. I'm not totally happy with it... | I was thinking of using only one flow. It's weird to seperate this process | in several flow, isn't it ? Well, your diagram sort of implies a parallel nature, which wasn't part of my original requirements. I have that need now, so flow needs to be updated to respect this. If you want to go over this via phone, and you are in the US or Canada I'd be happy to drop you a line... just send me your phone number and a good time. Best, Clark
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
:-( I am in France near Paris ! but this is my personnal adress : lafou<at>wanadoo<dot>fr and my icq number : 2376706 finally, my msn id : plafoucr@hotmail.com (eark, I hate msn) Thank you again Clark
![](https://secure.gravatar.com/avatar/792121149081c9c0fd251c5bbc7691cb.jpg?s=120&d=mm&r=g)
Did you try to contact me ? I 've got a LOT of spam this days, and I am affraid of throwing some "good" mails. thx
participants (3)
-
Clark C. Evans
-
Matthias Urlichs
-
Philippe Lafoucrière