[Twisted-Python] reactor.stop() won't, threads and Queue to blame?

Hi, I can't seem to make reactor.stop() actually stop the reactor and allow my program to exit. I hope someone can help. What I'm actually trying to do is implement an execution pipeline so I can serialize certain jobs but still run them in a thread so the main loop can go about its business. I do this with a class, CommandQueue, that has a Queue.Queue and a method that can be called via reactor.callInThread() that slurps the queue and runs any callables sent down it. Each callable's result is obtained from a Deferred. This thread loops until I set CommandQueue.stop=True. (Maybe someone knows a better way?) To try to triger the shutdown, I add a callback to the Deferred associated with the final callable sent down the queue. If I instead call shutdown function via reactor.callLater() then the reactor will actually stop. I don't know if it matters, but /bin/ps only shows one instance of python running. Usually I see one instance per thread. The code below and its output shows the problem. #!/usr/bin/env python2.3 from twisted.python import threadable threadable.init(1) from twisted.internet import reactor,defer from Queue import Queue,Empty class CommandQueue: '''Queue up commands for serial calling. One must call the drain() method to start reading the internal queue. Most likely one wants to call this in a thread.''' def __init__(self): "Create a CommandQueue" self.queue = Queue() self.stop = False return def __call__(self,meth,*a,**k): '''Call meth(*a,**k) when it reaches end of queue. Returns a Deferred that will pass the return of meth.''' deferred = defer.Deferred() self.queue.put((deferred,meth,a,k)) return deferred def drain(self): 'Drain the command queue until CommandQueue.stop is True' while not self.stop: try: d,meth,a,k = self.queue.get(True,1) except Empty: print " queue empty" continue print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k)) d.callback(meth(*a,**k)) print "callback done" print "drain closing" return 0 def test1(): import time cq = CommandQueue() reactor.callInThread(cq.drain) def shutdown(x=None): print "Stopping CommandQueue" cq.stop = True print "Stopping reactory" reactor.stop() print "reactor.stop()'ed" def burp(x): for n in range(0,x): time.sleep(1) print x,n return x def chirp(x): print "okay:",x return None def ouch(x): print "bad:",x return x last = 3 for n in range(0,last): print "dispatching",n d = cq(burp,n).addCallbacks(chirp,ouch) if last-n == 1: d.addCallbacks(shutdown,ouch) if __name__ == '__main__': print "running test1" test1() print "end test1" reactor.run() print "reactor exitted" #=-------------- end ------------=# Running this produces the following: [i386]bviren@aviator:test> ./test-commandqueue.py running test1 dispatching 0 dispatching 1 dispatching 2 end test1 calling burp((0,),{}) okay: 0 callback done calling burp((1,),{}) 1 0 okay: 1 callback done calling burp((2,),{}) 2 0 2 1 okay: 2 Stopping CommandQueue Stopping reactory reactor.stop()'ed callback done drain closing At which point I have to hit Control-C and finally get: reactor exitted Here is the mod to use reactor.callLater(): ... last = 3 for n in range(0,last): print "dispatching",n d = cq(burp,n).addCallbacks(chirp,ouch) #if last-n == 1: # d.addCallbacks(shutdown,ouch) reactor.callLater(10,shutdown) ... So, any ideas as to what I'm doing wrong? Thanks, -Brett.

Brett, For starters, I'd keep the 'Queue' in the main thread, and use callInThread to dispatch the function /w arguments. Use a d.callBoth (aka finally) to pop the next item from the queue and then do a callInThread for it. However, if you want to keep the Queue in the secondary thread, you have one problem that is obvious to me: On Sun, Oct 24, 2004 at 08:39:07PM -0400, Brett Viren wrote: | class CommandQueue: ... | def drain(self): | 'Drain the command queue until CommandQueue.stop is True' | while not self.stop: | try: | d,meth,a,k = self.queue.get(True,1) | except Empty: | print " queue empty" | continue | print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k)) | d.callback(meth(*a,**k)) | print "callback done" | print "drain closing" | return 0 | | def test1(): | import time | cq = CommandQueue() | reactor.callInThread(cq.drain) | You seem to be doing d.callback in the secondary thread, rather than in the primary thread. This could be causing some of the problems you are experiencing. It's not customary to use deferreds in any other but the main thread.
Cheers! Clark

On Sun, Oct 24, 2004 at 10:59:09PM -0400, Clark C. Evans wrote: [...]
However, if you want to keep the Queue in the secondary thread, you have one problem that is obvious to me:
[...]
Yep, that's the problem here. Change this: d.callback(meth(*a,**k)) to this: reactor.callFromThread(d.callback, meth(*a, **k)) (Or perhaps less confusingly: result = meth(*a, **kw) reactor.callFromThread(d.callback, result) )
There's no reason why Deferreds wouldn't work in another thread, if that's what you want. It's just that generally it's not what you want... Deferreds are used in Twisted to deal with asynchronous operations; but in non-event loop threads, you'd usually just block. If for some reason there were two event-loop threads in the one process, then Deferreds might be useful in both. Nothing about Deferreds is at all dependent on the reactor, except for the ill-conceived setTimeout functionality. If you want to run a callback chain in another thread, then Twisted shouldn't stop you (but I would expect you to very clearly comment your code to explain why, as it would be very unusual). This is just a long-winded way of saying that Deferred's implementation should be completely thread ignorant, even though in practice they're only used from the main thread. The real error here wasn't using Deferred.callback in another thread, it was using reactor.stop in that thread. -Andrew.

Andrew Bennetts <andrew-twisted@puzzling.org> writes:
Yes, that did it.
There's no reason why Deferreds wouldn't work in another thread, if that's what you want. It's just that generally it's not what you want...
In this case the Deferred is used as a return value for Twisted's XML-RPC server implementation. I go to this trouble of a CommandQueue because my system blurs the distinction between server and client and this was leading to deadlocks. This CommandQueue should make sure that all the troublesome communications are atomic. Thanks to both you and Clark for your help. -Brett.

On Mon, 2004-10-25 at 12:41 -0400, Brett Viren wrote:
Doing things in threads almost always makes things *less* atomic than just leaving them all in the main reactor thread. Even if I'm totally mistaken, I feel like I have to ask a few questions to make sure that newbies don't stumble across this thread in the future and think they need to start managing their own threadpools so Twisted won't deadlock ;) When you say you're "blurring the distinction between server and client", do you mean you're implementing something like an XMLRPC proxy, where the server is itself a client, relaying requests elswhere and waiting for their results? Or something else? Were you running requests in threads before you came up with the CommandQueue abstraction? If not, what caused the deadlocks? How was the client/server blurring related to the deadlocks? Finally, did you consider an approach where, rather than queueing commands, you just executed them synchronously and let the reactor serialize them? If so, what lead to the decision to change to a thread-based approach?

Glyph Lefkowitz <glyph@divmod.com> writes:
It is basically as you describe but with some additions. The primary aim is to marshal data from an XML-RPC client to a server using a custom protocol while providing status information as well as control. XML-RPC Custom data ----> proxy ---> data source <---- proxy sink ^ | /|\ | | | XML-RPC | \|/ | V GUI Monitor/Control The data source listens (is a server) for data requests which include a callback URL. After that, it sends data to (is a client for) the proxy which forwards the data to the data sink and sends a confirmation to the GUI monitor. The proxy also sends heartbeats fired via reactor.callLater to the GUI.
Yes. In the proxy, I handle the XML-RPC requests from the data source and the GUI via this class: class Spawner(threading.Thread): '''Call callable in its own thread, return value is sent into the Spawner.deferred.callback()''' def __init__(self,callable,errable=None,**kwds): threading.Thread.__init__(self,**kwds); self.callable = callable if errable is None: errable = self.chirp self.deferred = defer.Deferred() self.deferred.addErrback(errable) self.setDaemon(1) self.start() return def chirp(self,*args): print str(args) log.error(str(args)) return args def run(self): self.deferred.callback(self.callable()) This runs the request in a thread an returns the value via a deferred (which is used as the return value for the XML-RPC method).
The basic data proxying must not be interupted. Some of the control requests sent from the GUI can take more than the period between data updates and thus block that proxying. It's possible I'm doing something stupid in this design. Please let me know if you have improvements. Thanks, -Brett.

On Mon, Oct 25, 2004 at 12:41:12PM -0400, Brett Viren wrote: | Andrew Bennetts <andrew-twisted@puzzling.org> writes: | | > Yep, that's the problem here. Change this: | > d.callback(meth(*a,**k)) | > to this: | > reactor.callFromThread(d.callback, meth(*a, **k)) | > | > (Or perhaps less confusingly: | > result = meth(*a, **kw) | > reactor.callFromThread(d.callback, result) | > ) | | Yes, that did it. If your meth() could raise an exception, you probably also need to wrap that function call in a try/except block. try: result = meth(*a, **kw) except: result = failure.Failure() reactor.callFromThread(d.callback, result) Best, Clark

""" Sometimes you want to make sure that only one secondary thread is being used for a sequence of calls. This can be accomplished with a Queue as proposed by Brett Viren. """ from twisted.internet import reactor,defer,threads from Queue import Queue, Empty class CommandQueue: ''' Queue up commands for serial calling. One must call the drain() method to start reading the internal queue. Most likely one wants to call this in a thread. ''' def __init__(self): "Create a CommandQueue" self.queue = Queue() self.running = False return def _cbRunQueue(self, cbval = None): try: cd,meth,a,k = self.queue.get(True,1) d = threads.deferToThread(meth, *a, **k) d.addBoth(self._cbRunQueue) d.chainDeferred(cd) except Empty: self.running = False return cbval def __call__(self,meth,*a,**k): '''Call meth(*a,**k) when it reaches end of queue. Returns a Deferred that will pass the return of meth.''' d = defer.Deferred() self.queue.put((d,meth,a,k)) if not self.running: self.running = True self._cbRunQueue() return d def test1(): import time cq = CommandQueue() def shutdown(x=None): print "Stopping reactory" reactor.stop() print "reactor.stop()'ed" def burp(x): for n in range(0,x): time.sleep(1) print x,n return x def chirp(x): print "okay:",x return None def ouch(x): print "bad:",x return x last = 3 for n in range(0,last): print "dispatching",n d = cq(burp,n).addCallbacks(chirp,ouch) if last-n == 1: d.addCallbacks(shutdown,ouch) if __name__ == '__main__': print "running test1" test1() print "end test1" reactor.run() print "reactor exitted" -- Clark C. Evans Prometheus Research, LLC. http://www.prometheusresearch.com/ o office: +1.203.777.2550 ~/ , mobile: +1.203.444.0557 // (( Prometheus Research: Transforming Data Into Knowledge \\ , \/ - Research Exchange Database /\ - Survey & Assessment Technologies ` \ - Software Tools for Researchers ~ *

Brett, For starters, I'd keep the 'Queue' in the main thread, and use callInThread to dispatch the function /w arguments. Use a d.callBoth (aka finally) to pop the next item from the queue and then do a callInThread for it. However, if you want to keep the Queue in the secondary thread, you have one problem that is obvious to me: On Sun, Oct 24, 2004 at 08:39:07PM -0400, Brett Viren wrote: | class CommandQueue: ... | def drain(self): | 'Drain the command queue until CommandQueue.stop is True' | while not self.stop: | try: | d,meth,a,k = self.queue.get(True,1) | except Empty: | print " queue empty" | continue | print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k)) | d.callback(meth(*a,**k)) | print "callback done" | print "drain closing" | return 0 | | def test1(): | import time | cq = CommandQueue() | reactor.callInThread(cq.drain) | You seem to be doing d.callback in the secondary thread, rather than in the primary thread. This could be causing some of the problems you are experiencing. It's not customary to use deferreds in any other but the main thread.
Cheers! Clark

On Sun, Oct 24, 2004 at 10:59:09PM -0400, Clark C. Evans wrote: [...]
However, if you want to keep the Queue in the secondary thread, you have one problem that is obvious to me:
[...]
Yep, that's the problem here. Change this: d.callback(meth(*a,**k)) to this: reactor.callFromThread(d.callback, meth(*a, **k)) (Or perhaps less confusingly: result = meth(*a, **kw) reactor.callFromThread(d.callback, result) )
There's no reason why Deferreds wouldn't work in another thread, if that's what you want. It's just that generally it's not what you want... Deferreds are used in Twisted to deal with asynchronous operations; but in non-event loop threads, you'd usually just block. If for some reason there were two event-loop threads in the one process, then Deferreds might be useful in both. Nothing about Deferreds is at all dependent on the reactor, except for the ill-conceived setTimeout functionality. If you want to run a callback chain in another thread, then Twisted shouldn't stop you (but I would expect you to very clearly comment your code to explain why, as it would be very unusual). This is just a long-winded way of saying that Deferred's implementation should be completely thread ignorant, even though in practice they're only used from the main thread. The real error here wasn't using Deferred.callback in another thread, it was using reactor.stop in that thread. -Andrew.

Andrew Bennetts <andrew-twisted@puzzling.org> writes:
Yes, that did it.
There's no reason why Deferreds wouldn't work in another thread, if that's what you want. It's just that generally it's not what you want...
In this case the Deferred is used as a return value for Twisted's XML-RPC server implementation. I go to this trouble of a CommandQueue because my system blurs the distinction between server and client and this was leading to deadlocks. This CommandQueue should make sure that all the troublesome communications are atomic. Thanks to both you and Clark for your help. -Brett.

On Mon, 2004-10-25 at 12:41 -0400, Brett Viren wrote:
Doing things in threads almost always makes things *less* atomic than just leaving them all in the main reactor thread. Even if I'm totally mistaken, I feel like I have to ask a few questions to make sure that newbies don't stumble across this thread in the future and think they need to start managing their own threadpools so Twisted won't deadlock ;) When you say you're "blurring the distinction between server and client", do you mean you're implementing something like an XMLRPC proxy, where the server is itself a client, relaying requests elswhere and waiting for their results? Or something else? Were you running requests in threads before you came up with the CommandQueue abstraction? If not, what caused the deadlocks? How was the client/server blurring related to the deadlocks? Finally, did you consider an approach where, rather than queueing commands, you just executed them synchronously and let the reactor serialize them? If so, what lead to the decision to change to a thread-based approach?

Glyph Lefkowitz <glyph@divmod.com> writes:
It is basically as you describe but with some additions. The primary aim is to marshal data from an XML-RPC client to a server using a custom protocol while providing status information as well as control. XML-RPC Custom data ----> proxy ---> data source <---- proxy sink ^ | /|\ | | | XML-RPC | \|/ | V GUI Monitor/Control The data source listens (is a server) for data requests which include a callback URL. After that, it sends data to (is a client for) the proxy which forwards the data to the data sink and sends a confirmation to the GUI monitor. The proxy also sends heartbeats fired via reactor.callLater to the GUI.
Yes. In the proxy, I handle the XML-RPC requests from the data source and the GUI via this class: class Spawner(threading.Thread): '''Call callable in its own thread, return value is sent into the Spawner.deferred.callback()''' def __init__(self,callable,errable=None,**kwds): threading.Thread.__init__(self,**kwds); self.callable = callable if errable is None: errable = self.chirp self.deferred = defer.Deferred() self.deferred.addErrback(errable) self.setDaemon(1) self.start() return def chirp(self,*args): print str(args) log.error(str(args)) return args def run(self): self.deferred.callback(self.callable()) This runs the request in a thread an returns the value via a deferred (which is used as the return value for the XML-RPC method).
The basic data proxying must not be interupted. Some of the control requests sent from the GUI can take more than the period between data updates and thus block that proxying. It's possible I'm doing something stupid in this design. Please let me know if you have improvements. Thanks, -Brett.

On Mon, Oct 25, 2004 at 12:41:12PM -0400, Brett Viren wrote: | Andrew Bennetts <andrew-twisted@puzzling.org> writes: | | > Yep, that's the problem here. Change this: | > d.callback(meth(*a,**k)) | > to this: | > reactor.callFromThread(d.callback, meth(*a, **k)) | > | > (Or perhaps less confusingly: | > result = meth(*a, **kw) | > reactor.callFromThread(d.callback, result) | > ) | | Yes, that did it. If your meth() could raise an exception, you probably also need to wrap that function call in a try/except block. try: result = meth(*a, **kw) except: result = failure.Failure() reactor.callFromThread(d.callback, result) Best, Clark

""" Sometimes you want to make sure that only one secondary thread is being used for a sequence of calls. This can be accomplished with a Queue as proposed by Brett Viren. """ from twisted.internet import reactor,defer,threads from Queue import Queue, Empty class CommandQueue: ''' Queue up commands for serial calling. One must call the drain() method to start reading the internal queue. Most likely one wants to call this in a thread. ''' def __init__(self): "Create a CommandQueue" self.queue = Queue() self.running = False return def _cbRunQueue(self, cbval = None): try: cd,meth,a,k = self.queue.get(True,1) d = threads.deferToThread(meth, *a, **k) d.addBoth(self._cbRunQueue) d.chainDeferred(cd) except Empty: self.running = False return cbval def __call__(self,meth,*a,**k): '''Call meth(*a,**k) when it reaches end of queue. Returns a Deferred that will pass the return of meth.''' d = defer.Deferred() self.queue.put((d,meth,a,k)) if not self.running: self.running = True self._cbRunQueue() return d def test1(): import time cq = CommandQueue() def shutdown(x=None): print "Stopping reactory" reactor.stop() print "reactor.stop()'ed" def burp(x): for n in range(0,x): time.sleep(1) print x,n return x def chirp(x): print "okay:",x return None def ouch(x): print "bad:",x return x last = 3 for n in range(0,last): print "dispatching",n d = cq(burp,n).addCallbacks(chirp,ouch) if last-n == 1: d.addCallbacks(shutdown,ouch) if __name__ == '__main__': print "running test1" test1() print "end test1" reactor.run() print "reactor exitted" -- Clark C. Evans Prometheus Research, LLC. http://www.prometheusresearch.com/ o office: +1.203.777.2550 ~/ , mobile: +1.203.444.0557 // (( Prometheus Research: Transforming Data Into Knowledge \\ , \/ - Research Exchange Database /\ - Survey & Assessment Technologies ` \ - Software Tools for Researchers ~ *
participants (4)
-
Andrew Bennetts
-
Brett Viren
-
Clark C. Evans
-
Glyph Lefkowitz