[Twisted-Python] freeing the reactor to do other jobs
I'm using the XMLRPC server in twisted and a few methods call other, sometimes long running, functions/methods. I'm trying to get my brain around how to free the reactor to respond to other requests while this is happening. A scenario. A call is made to the server, which selects say 10K rows from a db and needs to check each row against a table and if they do not exist, insert them. """ Oversimplified version of the process """ def getData(self,user_id): rows = self.getUserData(user_id) for row in rows: if self.existsInQueue(row['some_id']): continue else: self.insertQueue(row) I want the caller to wait on a result from this process, but I also want the reactor to be able to handle other requests as they come in. This function is not directly registered in the xmlrpc server with xmlrpc_getData, but is called by that type of method after validation that it is allowed to run in this context. What i've seen when this has thousands of rows to process is that the reactor is tied up and can not respond to requests until complete. Which obviously leads to me believe that I'm not using twisted correctly/to its potential. I have read the deferred/asynchronous doc pages...but am having a hard time getting my head around it and would appreciate any advice. When i don't specifically need the caller to get the final result, i've been suing deferToThread, but feel in some of those instances i could possibly write better code, rather then sending to a thread. Thanks jd
On Fri, Nov 7, 2008 at 1:29 PM, Jeff Dyke <jeff.dyke@gmail.com> wrote:
I want the caller to wait on a result from this process, but I also want the reactor to be able to handle other requests as they come in. This function is not directly registered in the xmlrpc server with xmlrpc_getData, but is called by that type of method after validation that it is allowed to run in this context.
Hi Jeff, I'll let others tackle your specific twisted-database integration problem, but here are a few general rules of them I've found useful when working with twisted, sqlite and wx. - Any long-computations will block your app in general because of the GIL. Threading won't solve this (unless you move the compute intensive code into a C-module which explicitly releases the GIL, which can be hairy). Investigate using reactor.spawnProcess to spawn external processes to do any compute intensive stuff. - Try using a database library which releases the GIL - I've found great gains by using apsw instead of the built-in sqlite3 library - To integrate I/O blocking operations, it's useful to use deferToThread - which will block on the operation in another thread in Twisted's threadpool. Just make sure if you're in other threads you *schedule* calls to the main twisted thread via reactor.callFromThread (and similarly in you're running wx and twisted in separate threads via wx.CallAfter) I hope this is useful. Cheers, Reza -- Reza Lotun Senior Software Engineer GetPeer Limited reza@getpeer.com
On Fri, 7 Nov 2008 13:56:53 +0000, Reza Lotun <reza@getpeer.com> wrote:
On Fri, Nov 7, 2008 at 1:29 PM, Jeff Dyke <jeff.dyke@gmail.com> wrote:
I want the caller to wait on a result from this process, but I also want the reactor to be able to handle other requests as they come in. This function is not directly registered in the xmlrpc server with xmlrpc_getData, but is called by that type of method after validation that it is allowed to run in this context.
Hi Jeff,
I'll let others tackle your specific twisted-database integration problem, but here are a few general rules of them I've found useful when working with twisted, sqlite and wx.
- Any long-computations will block your app in general because of the GIL. Threading won't solve this (unless you move the compute intensive code into a C-module which explicitly releases the GIL, which can be hairy). Investigate using reactor.spawnProcess to spawn external processes to do any compute intensive stuff.
Long-running computations generally only block the thread they're running in. This shouldn't be surprising, since there's not really any difference between a function that does a "computation" and a function which is somehow just "regular" Python code and isn't a "computation". You can find details of how threading works in Python in the Python documentation, but briefly, after N bytecodes are executed, the VM running in thread A releases the GIL (required to execute bytecode) and any other thread has a chance to acquire it. By default, N is 100 (enough to multiply about 30 numbers together). It's only when you bring C into the picture that you have to think about explicitly releasing the GIL to prevent one thread from blocking all the rest. As long as your program is all Python, all your threads will basically play nicely together (an exception to this seems to be time.sleep() on Windows sometimes, which many people find blocks all threads inexplicably - but time.time is implemented in C, so this really just proves the point :). Jean-Paul
On Fri, 7 Nov 2008 08:29:50 -0500, Jeff Dyke <jeff.dyke@gmail.com> wrote:
I'm using the XMLRPC server in twisted and a few methods call other, sometimes long running, functions/methods. I'm trying to get my brain around how to free the reactor to respond to other requests while this is happening.
A scenario. A call is made to the server, which selects say 10K rows from a db and needs to check each row against a table and if they do not exist, insert them.
""" Oversimplified version of the process """ def getData(self,user_id): rows = self.getUserData(user_id) for row in rows: if self.existsInQueue(row['some_id']): continue else: self.insertQueue(row)
I want the caller to wait on a result from this process, but I also want the reactor to be able to handle other requests as they come in. This function is not directly registered in the xmlrpc server with xmlrpc_getData, but is called by that type of method after validation that it is allowed to run in this context.
What i've seen when this has thousands of rows to process is that the reactor is tied up and can not respond to requests until complete. Which obviously leads to me believe that I'm not using twisted correctly/to its potential. I have read the deferred/asynchronous doc pages...but am having a hard time getting my head around it and would appreciate any advice.
When i don't specifically need the caller to get the final result, i've been suing deferToThread, but feel in some of those instances i could possibly write better code, rather then sending to a thread.
One thing you might not have discovered yet is that even if you use deferToThread, you can still give the final result to the caller. The XML-RPC support in Twisted supports Deferreds - meaning that if an xmlrpc_ method returns a Deferred, then no response is sent to the XML-RPC request until that Deferred fires, and then the result is sent as the XML-RPC response. Since most libraries for interacting with RDBMs using SQL present a blocking interface, Twisted includes twisted.internet.adbapi, a thin layer on top of DB-API 2.0 which runs all of the blocking stuff in a threadpool. If most of your time is being spent waiting for rows from a database, then adbapi might help you out, and since adbapi gives you Deferreds, this is trivial to integrate into an XML-RPC server. For other blocking tasks - it depends. If the task is blocking on an event, then transforming that event into a callback (probably using a Deferred, since Deferreds are a good tool to use to manage callbacks) and then putting your code into a callback instead of blocking on the event is the right thing to do. How exactly you turn a particular event into a callback depends on the details of the event, though. If the blocking task is CPU bound, then running it in another thread or another process can make sense. It's also possible to insert explicit control-flow yields into the implementation of the CPU bound task (at least, sometimes) so that the reactor can service other event sources as the calculation progresses. Jean-Paul
Jeff Dyke wrote:
I'm using the XMLRPC server in twisted and a few methods call other, sometimes long running, functions/methods. I'm trying to get my brain around how to free the reactor to respond to other requests while this is happening.
A scenario. A call is made to the server, which selects say 10K rows from a db and needs to check each row against a table and if they do not exist, insert them.
""" Oversimplified version of the process """ def getData(self,user_id): rows = self.getUserData(user_id) for row in rows: if self.existsInQueue(row['some_id']): continue else: self.insertQueue(row)
You could do something like this, using twisted.internet.task.Cooperator def batch(iterable, size): sourceiter = iter(iterable) while True: batchiter = itertools.islice(sourceiter, size) yield itertools.chain([batchiter.next()], batchiter) coop = twisted.internet.task.Cooperator() def _gotdata(rows): d = defer.Deferred() def worker(): # only do 10 rows at a time, then yield control for rowg in batch(rows, 10): for row in rowg: # do something yield d.callback(True) coop.coiterate(worker()) return d class foo: def xmlrpc_thing(self, userid): d = getUserData(userid) d.addCallback(_gotdata) return d You can so similar things with defer.inlineCallbacks, or even just plain deferreds if you want to work hard at it.
On 2008.11.07 08:29:50 -0500, Jeff Dyke wrote:
I'm using the XMLRPC server in twisted and a few methods call other, sometimes long running, functions/methods. I'm trying to get my brain around how to free the reactor to respond to other requests while this is happening.
There are two ways: 1. Instead, write functions that don't block for long, but instead do a little bit of work, schedule a call to do the rest of the work, and then return, so the reactor can have the CPU back. 2. Farm out big chunks of work that you can't or don't want to split up to a subprocess or thread.
A scenario. A call is made to the server, which selects say 10K rows from a db and needs to check each row against a table and if they do not exist, insert them.
""" Oversimplified version of the process """ def getData(self,user_id): rows = self.getUserData(user_id) for row in rows: if self.existsInQueue(row['some_id']): continue else: self.insertQueue(row)
If the long-running work is in a blocking database call, and the database does not support a less-blocking version and you can't change the database, then you probably want to use deferToThread for that part. And then move the rest of getData into a separate function, that gets called in a callback after getUserData finishes. def getData(self,user_id): deferred1 = reactor.deferToThread(self.getUserData, user_id) deferred1.addCallback(self._addRowsToQueue) deferred1.addErrback(self._getUserDataFailed) If adding the rows to the queue is fast, then you're done. Just move everything after getUserData into _addRowsToQueue. def _addRowsToQueue(self, rows): for row in rows: if not self.existsInQueue(row['some_id']): self.insertQueue(row) But if adding all the rows to the queue in one function call is too slow, then you need to split it up. It's a loop, so splitting it up is easy. Here's the simple scheduling-only version: def _addSomeRowsToQueue(self, rows): if rows: row = rows.pop(0) if not self.existsInQueue(row['some_id']): self.insertQueue(row) reactor.callLater(0, self._addSomeRowsToQueue, rows) Adding deferreds to the mix so that a callback function is called when all the rows are added to the queue is the next step, after you understand how this much works.
I want the caller to wait on a result from this process, but I also want the reactor to be able to handle other requests as they come in.
I hope you can live with "I want something to happen using the results from this process", rather than "I want the caller to wait on a result from this process." You can simulate blocking flow somewhat with deferredGenerator or inlineCallbacks, but I recommend sticking to the old way at first. It's simpler and less magical. -- David Ripton dripton@ripton.net
On Fri, Nov 7, 2008 at 10:26 AM, David Ripton <dripton@ripton.net> wrote:
On 2008.11.07 08:29:50 -0500, Jeff Dyke wrote:
I'm using the XMLRPC server in twisted and a few methods call other, sometimes long running, functions/methods. I'm trying to get my brain around how to free the reactor to respond to other requests while this is happening.
There are two ways:
1. Instead, write functions that don't block for long, but instead do a little bit of work, schedule a call to do the rest of the work, and then return, so the reactor can have the CPU back.
2. Farm out big chunks of work that you can't or don't want to split up to a subprocess or thread.
A scenario. A call is made to the server, which selects say 10K rows from a db and needs to check each row against a table and if they do not exist, insert them.
""" Oversimplified version of the process """ def getData(self,user_id): rows = self.getUserData(user_id) for row in rows: if self.existsInQueue(row['some_id']): continue else: self.insertQueue(row)
If the long-running work is in a blocking database call, and the database does not support a less-blocking version and you can't change the database, then you probably want to use deferToThread for that part.
And then move the rest of getData into a separate function, that gets called in a callback after getUserData finishes.
def getData(self,user_id): deferred1 = reactor.deferToThread(self.getUserData, user_id) deferred1.addCallback(self._addRowsToQueue) deferred1.addErrback(self._getUserDataFailed)
If adding the rows to the queue is fast, then you're done. Just move everything after getUserData into _addRowsToQueue.
def _addRowsToQueue(self, rows): for row in rows: if not self.existsInQueue(row['some_id']): self.insertQueue(row)
But if adding all the rows to the queue in one function call is too slow, then you need to split it up. It's a loop, so splitting it up is easy. Here's the simple scheduling-only version:
def _addSomeRowsToQueue(self, rows): if rows: row = rows.pop(0) if not self.existsInQueue(row['some_id']): self.insertQueue(row) reactor.callLater(0, self._addSomeRowsToQueue, rows)
Adding deferreds to the mix so that a callback function is called when all the rows are added to the queue is the next step, after you understand how this much works.
I want the caller to wait on a result from this process, but I also want the reactor to be able to handle other requests as they come in.
I hope you can live with "I want something to happen using the results from this process", rather than "I want the caller to wait on a result from this process."
You can simulate blocking flow somewhat with deferredGenerator or inlineCallbacks, but I recommend sticking to the old way at first. It's simpler and less magical.
-- David Ripton dripton@ripton.net
Thanks all for the input... this will be a great help.
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
participants (5)
-
David Ripton -
Jean-Paul Calderone -
Jeff Dyke -
Phil Mayers -
Reza Lotun