[Twisted-Python] Need some pointers for writing asynchronous code for Twisted app

I'm trying to rewrite an existing 'back end' server application. The old app worked something like this. Client boxes, using multiple methods (ftp, copy over nfs mount, even rsh sessions) create a file on disk on the server, which was either XML or a simple proprietary data format. The server was done in python. It loops over a direcory, looking for new files, and processes them into a MySQL database. I patched the client (not python, but proprietary vendor apps, which are glued together via TCL,) to just write the data to a TCP socket. Using Twisted, I now have a test TCP server running, which uses LineReceiver, with each line recieved added to a list, and a connection close callback that writes the data to a file. This last part isn't part of the final app, and it does block, i know. It's just for me to see the data we're gettiing. So we have a working system now that transmits the files via a unified method, to a server that can handle simultaneous connections. Cool. Now I have to do real work with the data. I have some architectural questions on how to proceed. Take the case of the XML data. In the old version, it reads the XML into an ElementTree, uses business logic to iterate through all or part of the tree, building a key, value dict, that dict is passed to another object whose methods construct sql inserts from the dict data and makes db calls. (That's simplified, the current db layer is a huge rube goldberg). So the easy way out, it seems to me, would be to make the LineRecever callback build the ElementTree as I get it. Then wrap minimally modfied versions of the code that processes the ElementTree to the dict, and the dict to the database, in a callInThread or deferToThread call. Which is a lot of use of the thread pool, which seems to violate the idea of a low-overhead asynchronous event loop. So is there a better way? For example, if I have a callback chain, when the first one fires, do they all fire in sequence as the prior callback returns, or does the chain yield to other events. If it does, I could potentially break the code into smaller chunks, say so each one processed enough tree data to generate 1 dict entry, and add the chunks as a callback chain on the connectionLost? Note: None of this code is tested, I'm just trying to get the basic logic worked out. Something like this? def connectionLost(self): d = defer.Deferrred() d.addCallback(chunkOne) d.addCallback(chunkTwo) d.addCallback(chunkThree) d.addCallback(chunkN...) d.addCallback(finish) d.callback(self.myElementTree) If I have a bunch of connections that close as simultaneously as the implementation allows, does that sequence all fire first for one closing connection, then the next, and so on? Or do they intermix? Or do I need to set up a chain of deferreds with explicit scheduling? Something like: def connectionLost(self): self.myDict = {} finish() def finish(self) d = defer.Deferred def realFinish(d): do stuff to clean up d.addCallback(ChunkThree) d.addCallback(realFinish) reactor.callLater(0, d.callback, None) def chunkThree() d = defer.Deferred def realChunkThree(self.MyElementTree, self.myDict): do stuff to process one dict key d.addCallback(ChunkTwo) d.addCallback(realChunkThree) reactor.callLater(0, d.callback, None) return d etc, The above doesn't really seem much different than the first, it's just that we schedule the calls explicitly, and pass data around in multiple deferreds. The last thing I though about doing was something like this: def connectionLost(self): myDict = {} d.defer.Deferred() d.addCallback(finish) myIterObj = self.myElementTree.getIterator() def processChunk(): try: foo = myIterObj.next() do stuff with foo to process element to dict entry except StopIteration: d.callback(None) except: error handling stuff else reactor.callLater(0, processChunk) return d Except I found some really similar code in an old thread, where Bob Ippolito says, 'just use flow instead' http://twistedmatrix.com/pipermail/twisted-python/2003-July/005013.html But the current flow doc says: Don't use flow, write asynchronous code.

* Brian Costlow <brian.costlow@gmail.com> [2007-03-09 18:34:17 -0500]:
This is very similar to what twisted.internet.task.coiterate() does. See the following: http://jcalderone.livejournal.com/24285.html http://twistedmatrix.com/documents/current/api/twisted.internet.task.html -- mithrandi, i Ainil en-Balandor, a faer Ambar

Brian Costlow wrote: [...]
Most databases don't really give you any choice but to use threads. twisted.enterprise.adbapi helps a little. Compared to the time it takes for the database to do its stuff, I doubt you'll notice the thread overhead.
Deferreds are completely independent of the reactor (i.e. event loop). They don't any magical yielding to the event loop or anything like that. Deferreds simply manage the chain of callbacks, and arrange for them to be called as soon as the the data they're waiting on is there.
They will all fire immediately. It's just like doing: result = chunkOne(self.myElementTree) result = chunkTwo(result) result = chunkTwo(result) result = chunkThree(result) result = chunkN(result) ... finish(result) i.e. synchronous.
This is an awkward way to arrange it, but this would let the reactor do work between the chunks, yes.
This approach can work a little better, yeah. Note that returning a Deferred from connectionLost doesn't do anything. What do you want to wait on the deferred (i.e. what in your code is waiting on this result)? As far as I can tell, nothing. If so, you probably don't want a Deferred at all. http://twistedmatrix.com/projects/core/documentation/examples/longex.py and http://twistedmatrix.com/projects/core/documentation/examples/longex2.py Have some basic examples of this sort of stuff. There's also the "cooperator" module in http://divmod.org/trac/wiki/DivmodEpsilon, but it's totally lacking in documentation. So who knows if it's really appropriate for this, or if you'll be able to figure out how to use it.
In this case, twisted.internet.defer.inlineCallbacks could probably be used instead of flow: @inlineCallbacks def doChunks(): for chunk in chunks: # do the next chunk chunk() # yield to the event loop d = Deferred() reactor.callLater(0, d.callback, None) yield d (The deferLater function in http://twistedmatrix.com/trac/ticket/1875 would make this even shorter.) Finally, are you sure you really need to chunk this processing at all? ElementTree is pretty fast; it's entirely possible that breaking it into chunks and going in-and-out of the event loop repeatedly will hurt your performance more than just doing it all at once. It might be a good idea to check if you actually have a real performance problem (rather than just a theoretical one) before you worry about solving it. Similarly, consider just putting the computationally expensive stuff in a deferToThread call and letting your OS worry about scheduling it. If the processing doesn't need to interact much with the event-driven code, then this can be a good option. -Andrew.

On Sat, 2007-03-10 at 12:02 +1100, Andrew Bennetts wrote:
I agree. Premature optimisation is the root of all evil. I've done some really dumb things trying to optimise too early, though I thought I was being really clever at the time. If you're not sure how you're going to do things, write a prototype first, then rewrite from scratch using everything you learn from the prototype. You end up with better code in the end, and it's faster and easier than trying to gradually migrate old, poorly written code, in my experience. -- Justin Warren <daedalus@eigenmagic.com>

Andrew Bennetts ha scritto:
A pure asyncronous database API (written in Twisted), for PostgreSQL only, is pgasync http://jamwt.com/pgasync/ and pglib http://developer.berlios.de/projects/pglib/ Regards Manlio Perillo

Thanks everyone for your comments, you were all a lot of help. Andrew, I was aware that the deferred didn't go anywhere, I oversimplified things in the example, in the original design in my head, the code that actually does the db calls was waiting on the deferred. Actually, the way it's written, it never even runs, I left out the first call to processChunk() necessary to start the iteration. But your comments helped a lot. My concern wasn't ElementTrees built in stuff, but 300+ line spaghetti procedure that massages the element tree data into a dictionary. That needs to be rewritten anyway. I messed around this morning, and here's what I have done. I compiled cElementTree onto the test box. The LineReceiver uses cElementTree.XMLTreeBuilder.feed() and builds the tree on the fly. At connectionClose it calls a method that runs the business logic routine in a thread, using threads.deferToThread, and adding a callback; said callback executes the db code. That still blocks for now. So, my current plan is to refactor the tree-->dict code, but call that in a separate thread. Rewite the dict --> db layer. The only question I have, is since it's a long series of insert/update calls in which I don't need results other than, 'it worked' , is, after reading the comments, it seems like I might just best be served by writing a series of straight dbi calls; and wrapping that all in one call to thread.callInThread. The db is MySQL, and feeds a PHP-based reporting app, so I can't use pgasync. Thanks again all.

* Brian Costlow <brian.costlow@gmail.com> [2007-03-09 18:34:17 -0500]:
This is very similar to what twisted.internet.task.coiterate() does. See the following: http://jcalderone.livejournal.com/24285.html http://twistedmatrix.com/documents/current/api/twisted.internet.task.html -- mithrandi, i Ainil en-Balandor, a faer Ambar

Brian Costlow wrote: [...]
Most databases don't really give you any choice but to use threads. twisted.enterprise.adbapi helps a little. Compared to the time it takes for the database to do its stuff, I doubt you'll notice the thread overhead.
Deferreds are completely independent of the reactor (i.e. event loop). They don't any magical yielding to the event loop or anything like that. Deferreds simply manage the chain of callbacks, and arrange for them to be called as soon as the the data they're waiting on is there.
They will all fire immediately. It's just like doing: result = chunkOne(self.myElementTree) result = chunkTwo(result) result = chunkTwo(result) result = chunkThree(result) result = chunkN(result) ... finish(result) i.e. synchronous.
This is an awkward way to arrange it, but this would let the reactor do work between the chunks, yes.
This approach can work a little better, yeah. Note that returning a Deferred from connectionLost doesn't do anything. What do you want to wait on the deferred (i.e. what in your code is waiting on this result)? As far as I can tell, nothing. If so, you probably don't want a Deferred at all. http://twistedmatrix.com/projects/core/documentation/examples/longex.py and http://twistedmatrix.com/projects/core/documentation/examples/longex2.py Have some basic examples of this sort of stuff. There's also the "cooperator" module in http://divmod.org/trac/wiki/DivmodEpsilon, but it's totally lacking in documentation. So who knows if it's really appropriate for this, or if you'll be able to figure out how to use it.
In this case, twisted.internet.defer.inlineCallbacks could probably be used instead of flow: @inlineCallbacks def doChunks(): for chunk in chunks: # do the next chunk chunk() # yield to the event loop d = Deferred() reactor.callLater(0, d.callback, None) yield d (The deferLater function in http://twistedmatrix.com/trac/ticket/1875 would make this even shorter.) Finally, are you sure you really need to chunk this processing at all? ElementTree is pretty fast; it's entirely possible that breaking it into chunks and going in-and-out of the event loop repeatedly will hurt your performance more than just doing it all at once. It might be a good idea to check if you actually have a real performance problem (rather than just a theoretical one) before you worry about solving it. Similarly, consider just putting the computationally expensive stuff in a deferToThread call and letting your OS worry about scheduling it. If the processing doesn't need to interact much with the event-driven code, then this can be a good option. -Andrew.

On Sat, 2007-03-10 at 12:02 +1100, Andrew Bennetts wrote:
I agree. Premature optimisation is the root of all evil. I've done some really dumb things trying to optimise too early, though I thought I was being really clever at the time. If you're not sure how you're going to do things, write a prototype first, then rewrite from scratch using everything you learn from the prototype. You end up with better code in the end, and it's faster and easier than trying to gradually migrate old, poorly written code, in my experience. -- Justin Warren <daedalus@eigenmagic.com>

Andrew Bennetts ha scritto:
A pure asyncronous database API (written in Twisted), for PostgreSQL only, is pgasync http://jamwt.com/pgasync/ and pglib http://developer.berlios.de/projects/pglib/ Regards Manlio Perillo

Thanks everyone for your comments, you were all a lot of help. Andrew, I was aware that the deferred didn't go anywhere, I oversimplified things in the example, in the original design in my head, the code that actually does the db calls was waiting on the deferred. Actually, the way it's written, it never even runs, I left out the first call to processChunk() necessary to start the iteration. But your comments helped a lot. My concern wasn't ElementTrees built in stuff, but 300+ line spaghetti procedure that massages the element tree data into a dictionary. That needs to be rewritten anyway. I messed around this morning, and here's what I have done. I compiled cElementTree onto the test box. The LineReceiver uses cElementTree.XMLTreeBuilder.feed() and builds the tree on the fly. At connectionClose it calls a method that runs the business logic routine in a thread, using threads.deferToThread, and adding a callback; said callback executes the db code. That still blocks for now. So, my current plan is to refactor the tree-->dict code, but call that in a separate thread. Rewite the dict --> db layer. The only question I have, is since it's a long series of insert/update calls in which I don't need results other than, 'it worked' , is, after reading the comments, it seems like I might just best be served by writing a series of straight dbi calls; and wrapping that all in one call to thread.callInThread. The db is MySQL, and feeds a PHP-based reporting app, so I can't use pgasync. Thanks again all.
participants (5)
-
Andrew Bennetts
-
Brian Costlow
-
Justin Warren
-
Manlio Perillo
-
Tristan Seligmann