[Twisted-Python] pyactivemq interfering with Twisted?
Hi, I'm writing what seems to me to be a trivial Twisted client application using a proprietary protocol, and I'm having problems with integrating it to ActiveMQ using pyactivemq (which is a Boost.Python wrapper around the ActiveMQ-CPP library). What I'm observing is that calling my client protocol's transport.write() doesn't seem to send data to the server. The server, also implemented using Twisted but without using pyactivemq, does not seem to have this problem. Could the part of pyactivemq (or ActiveMQ-CPP) that handles asynchronous consumption of messages be interfering somehow with Twisted's event loop and causing this problem? Brian
On Tue, 2008-03-18 at 20:04 +0800, Brian Baquiran wrote:
Could the part of pyactivemq (or ActiveMQ-CPP) that handles asynchronous consumption of messages be interfering somehow with Twisted's event loop and causing this problem?
Quite possibly; if the API you're calling is blocking then Twisted's event loop won't be able to run. If that is the case, either figure out if you can integrate the library with Twisted's event loop, or use twisted.internet.threads.deferToThread or something similar to call the blocking API.
On Tue, Mar 18, 2008 at 9:16 PM, Itamar Shtull-Trauring <itamar@itamarst.org> wrote:
On Tue, 2008-03-18 at 20:04 +0800, Brian Baquiran wrote:
Could the part of pyactivemq (or ActiveMQ-CPP) that handles asynchronous consumption of messages be interfering somehow with Twisted's event loop and causing this problem?
Quite possibly; if the API you're calling is blocking then Twisted's event loop won't be able to run. If that is the case, either figure out if you can integrate the library with Twisted's event loop, or use twisted.internet.threads.deferToThread or something similar to call the blocking API.
The API does not block when using an async message listener as I'm doing. Here's some example pyactivemq code that shows how an asynchronous message listener is registered: http://pyactivemq.googlecode.com/svn/trunk/src/examples/asynclistener.py In my code, I'm calling the protocol's transport.write(data) from within the onMessage() callback. In my testing, I've also seen cases wherein I consume a whole lot of messages from ActiveMQ and call transport.write() for each of them but nothing appears on the server (both client and server twisted apps are on the same machine) I wait a while -- couple of seconds to a minute -- then suddenly the data arrives on the server in a single dataReceived(). Would it work if I didn't call transport.write() from within the onMessage() callback? Perhaps having some intermediate buffer to avoid having twisted methods called from inside the message listener? Thanks, Brian
On Wed, Mar 19, 2008 at 2:49 AM, Brian Baquiran <brianbaquiran@gmail.com> wrote:
The API does not block when using an async message listener as I'm doing. Here's some example pyactivemq code that shows how an asynchronous message listener is registered: http://pyactivemq.googlecode.com/svn/trunk/src/examples/asynclistener.py
This API actually blocks. Just by looking at: conn = f.createConnection() Which means that you can create a connection and immediately get the result. This is blocking by any relevant definition of blocking. Judging by the API the only difference between sync and async seems to be related to the type of producer involved. A Push producer is generally considered async and a pull producer is sync. Looking further in the API of ActiveMQ-CPP, even though I'm not too good with C++, I can easily spot lines like the following: Response* response = transport->request( &cmd ); which to me appears as totally synchronous and thus blocking, a non blocking API would have been more like: transport->request(&cmd, &onMessage); The transport itself is implemented in the following way: // Start the polling thread. thread = new Thread( this ); thread->start(); and its run method contains: try{ while( !closed ){ // Read the next command from the input stream. Command* command = reader->readCommand(); // Notify the listener. fire( command ); } } Considering all of this you should probably use this from deferToThread. -- Valentino Volonghi aka Dialtone Now running MacOS X 10.5 Home Page: http://www.twisted.it
Valentino Volonghi wrote:
On Wed, Mar 19, 2008 at 2:49 AM, Brian Baquiran <brianbaquiran@gmail.com> wrote:
The API does not block when using an async message listener as I'm doing. Here's some example pyactivemq code that shows how an asynchronous message listener is registered: http://pyactivemq.googlecode.com/svn/trunk/src/examples/asynclistener.py
This API actually blocks.
Just by looking at: conn = f.createConnection()
Which means that you can create a connection and immediately get the result. This is blocking by any relevant definition of blocking.
Considering all of this you should probably use this from deferToThread.
I was able to fix my original problem by using reactor.callFromThread() from the onMessage callback, so it does look like there is some threading going on. I'll look where deferToThread should be applied. Thanks Valentino. I really appreciate your looking at the code. Brian
Itamar Shtull-Trauring wrote:
On Tue, 2008-03-18 at 20:04 +0800, Brian Baquiran wrote:
Could the part of pyactivemq (or ActiveMQ-CPP) that handles asynchronous consumption of messages be interfering somehow with Twisted's event loop and causing this problem?
Quite possibly; if the API you're calling is blocking then Twisted's event loop won't be able to run. If that is the case, either figure out if you can integrate the library with Twisted's event loop, or use twisted.internet.threads.deferToThread or something similar to call the blocking API.
Hi Itamar, Thanks a lot for the tip. I just used reactor.callFromThread() from inside the onMessage() callaback to execute the twisted code in the Twisted event loop instead of in the pyactivemq event loop and it seems to have solved the problem. Regards, Brian
participants (3)
-
Brian Baquiran -
Itamar Shtull-Trauring -
Valentino Volonghi