[Twisted-Python] IPushProducer - medium volume streaming
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
I've got an IPushProducer written with these 3 methods and it works fine - i.e. resumeProducing() is called, then eventually pauseProducing is called() when the producer produces data too fast; when the client disconnects early, the stopProducing method is called, etc.. def pauseProducing(self): def resumeProducing(self): def stopProducing(self): My problem is, the source of the data (a slow non-relational database) that I send back to the client from within resumeProducing generates data kind of slowly. As in, it is quite rare that pauseProducing is ever called in my tests. I feel that the data source generates data too slowly, and have figured out I will block the entire server during the resumeProducing operation (for example, during the first call to resumeProducing, I do not generate any data at all for the first 20 seconds during bootstrapping). At this point, I am thinking about the following ideas of code changes to prevent blockage of the server: 1) forking off a thread so that I will not block other clients from talking to the server. I will not have very many clients using this producer, so this would not result in any large numbers of threads which I know can be a problem. I know how to spawn Python threads and use socket APIs, but it is not clear to me how I would get at the socket descriptor from within the twisted framework (I am using a LineReceiver subclass, which in this case is spawning the IPushProducer). Are there any examples of spawning off a thread from a LineReceiver class and communicating using blocking calls within said thread? 2) returning from resumeProducing after a few seconds of production, even though I could produce more, so I do not block the server. I have tested this, it works, and does let other clients get in, but I still feel it is suboptimal, since my data source is so slow, it still blocks the server during each of those few second intervals. Any recommendations here? Thank you.
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/26/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
1) forking off a thread so that I will not block other clients from talking to the server.
If this is a SQL database with which you're using a dbapi module, you should probably be using twisted.enterprise.adbapi to make access to that database asynchronous. Under the hood it's using threads to make the interface asynchronous. Otherwise, if the only programmatic interface this database of yours has is blocking, then you'll probably need to use a thread yourself. See twisted.internet.threads.deferToThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
This isn't a SQL db so I can't use adbapi. So using deferToThread, inside my thread, when I have a result to write to the client, I call callFromThread to schedule a write in the reactor loop. My question is, how exactly do I use callFromThread to write to the client from the thread (I use the LineReceiver class)? (specifically, what context do I pass to the thread func, to pass into callFromThread call, to make sure it writes back to the right client?) -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Friday, October 26, 2007 7:19 PM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/26/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
If this is a SQL database with which you're using a dbapi module, you should probably be using twisted.enterprise.adbapi to make access to that database asynchronous. Under the hood it's using threads to make the interface asynchronous. Otherwise, if the only programmatic interface this database of yours has is blocking, then you'll probably need to use a thread yourself. See twisted.internet.threads.deferToThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, if you're using deferToThread, the way you give the result back to the mainloop is by returning the value from the function. Then that result gets fired on the Deferred that deferToThread created. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
It's a long-running streaming flow so there will be millions of results to send to the client. You're saying I call deferToThread millions of times? -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Sunday, October 28, 2007 10:51 AM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, if you're using deferToThread, the way you give the result back to the mainloop is by returning the value from the function. Then that result gets fired on the Deferred that deferToThread created. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, I was just correcting your description of the way that deferToThread is used. If you don't want to use deferToThread, then you should probably use a combination of reactor.callInThread and reactor.callFromThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
Thanks. Now using callInThread to launch a long-running function on the server and callFromThread from said function to write results to the client. I'm still puzzled how exactly 'reactor.callFromThread(self.transport.writeSomeData(...))' gets routed to the right client when called from my function. How does twisted know which client that message is going to? After all, it calls it from the main reactor loop. i.e. what state does it use to get this right? -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Sunday, October 28, 2007 12:11 PM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, I was just correcting your description of the way that deferToThread is used. If you don't want to use deferToThread, then you should probably use a combination of reactor.callInThread and reactor.callFromThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
On Mon, 2007-10-29 at 09:21 -0400, Rutt, Benjamin wrote:
I presume you mean: reactor.callFromThread(self.transport.writeSomeData, ...) Note the subtle difference
to the right client when called from my function. How does twisted know
The "transport" object is a bound connection and contains the address of the endpoint (if you're using TCP that is)
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/29/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
"self.transport" is associated with the particular connection. Also, there are a couple of other things wrong about your snippet: 1. use "write", not "writeSomeData". writeSomeData is an internal implementation detail. 2. You're actually calling writeSomeData and passing the *result* of that to callFromThread. That's wrong; you need to pass a callable and its arguments to callFromThread. So instead of reactor.callFromThread(foo(a, b)), you write reactor.callFromThread(foo, a, b). That means you should ultimately be using:: reactor.callFromThread(self.transport.write, data) -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
Thanks Chris and Phil. My mistake for writing 'reactor.callFromThread(self.transport.writeSomeData(...))' not 'reactor.callFromThread(self.transport.writeSomeData, ...)' the latter which I'm actually using. Also I fixed to use write() not writeSomeData(), thanks. Regarding state, I take it each new client connection to my server results in a new instance of my LineReceiver subclass. So therefore the self.transport object contains the TCP socket fd in there somewhere - makes sense, thanks. -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Monday, October 29, 2007 10:57 AM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/29/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote: the
main reactor loop. i.e. what state does it use to get this right?
"self.transport" is associated with the particular connection. Also, there are a couple of other things wrong about your snippet: 1. use "write", not "writeSomeData". writeSomeData is an internal implementation detail. 2. You're actually calling writeSomeData and passing the *result* of that to callFromThread. That's wrong; you need to pass a callable and its arguments to callFromThread. So instead of reactor.callFromThread(foo(a, b)), you write reactor.callFromThread(foo, a, b). That means you should ultimately be using:: reactor.callFromThread(self.transport.write, data) -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/26/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
1) forking off a thread so that I will not block other clients from talking to the server.
If this is a SQL database with which you're using a dbapi module, you should probably be using twisted.enterprise.adbapi to make access to that database asynchronous. Under the hood it's using threads to make the interface asynchronous. Otherwise, if the only programmatic interface this database of yours has is blocking, then you'll probably need to use a thread yourself. See twisted.internet.threads.deferToThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
This isn't a SQL db so I can't use adbapi. So using deferToThread, inside my thread, when I have a result to write to the client, I call callFromThread to schedule a write in the reactor loop. My question is, how exactly do I use callFromThread to write to the client from the thread (I use the LineReceiver class)? (specifically, what context do I pass to the thread func, to pass into callFromThread call, to make sure it writes back to the right client?) -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Friday, October 26, 2007 7:19 PM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/26/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
If this is a SQL database with which you're using a dbapi module, you should probably be using twisted.enterprise.adbapi to make access to that database asynchronous. Under the hood it's using threads to make the interface asynchronous. Otherwise, if the only programmatic interface this database of yours has is blocking, then you'll probably need to use a thread yourself. See twisted.internet.threads.deferToThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, if you're using deferToThread, the way you give the result back to the mainloop is by returning the value from the function. Then that result gets fired on the Deferred that deferToThread created. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
It's a long-running streaming flow so there will be millions of results to send to the client. You're saying I call deferToThread millions of times? -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Sunday, October 28, 2007 10:51 AM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, if you're using deferToThread, the way you give the result back to the mainloop is by returning the value from the function. Then that result gets fired on the Deferred that deferToThread created. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, I was just correcting your description of the way that deferToThread is used. If you don't want to use deferToThread, then you should probably use a combination of reactor.callInThread and reactor.callFromThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
Thanks. Now using callInThread to launch a long-running function on the server and callFromThread from said function to write results to the client. I'm still puzzled how exactly 'reactor.callFromThread(self.transport.writeSomeData(...))' gets routed to the right client when called from my function. How does twisted know which client that message is going to? After all, it calls it from the main reactor loop. i.e. what state does it use to get this right? -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Sunday, October 28, 2007 12:11 PM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/28/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
No, I was just correcting your description of the way that deferToThread is used. If you don't want to use deferToThread, then you should probably use a combination of reactor.callInThread and reactor.callFromThread. -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
On Mon, 2007-10-29 at 09:21 -0400, Rutt, Benjamin wrote:
I presume you mean: reactor.callFromThread(self.transport.writeSomeData, ...) Note the subtle difference
to the right client when called from my function. How does twisted know
The "transport" object is a bound connection and contains the address of the endpoint (if you're using TCP that is)
![](https://secure.gravatar.com/avatar/3a7e70f3ef2ad1539da42afc85c8d09d.jpg?s=120&d=mm&r=g)
On 10/29/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote:
"self.transport" is associated with the particular connection. Also, there are a couple of other things wrong about your snippet: 1. use "write", not "writeSomeData". writeSomeData is an internal implementation detail. 2. You're actually calling writeSomeData and passing the *result* of that to callFromThread. That's wrong; you need to pass a callable and its arguments to callFromThread. So instead of reactor.callFromThread(foo(a, b)), you write reactor.callFromThread(foo, a, b). That means you should ultimately be using:: reactor.callFromThread(self.transport.write, data) -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/
![](https://secure.gravatar.com/avatar/e1aa8babe2506302322c4c369db501cd.jpg?s=120&d=mm&r=g)
Thanks Chris and Phil. My mistake for writing 'reactor.callFromThread(self.transport.writeSomeData(...))' not 'reactor.callFromThread(self.transport.writeSomeData, ...)' the latter which I'm actually using. Also I fixed to use write() not writeSomeData(), thanks. Regarding state, I take it each new client connection to my server results in a new instance of my LineReceiver subclass. So therefore the self.transport object contains the TCP socket fd in there somewhere - makes sense, thanks. -----Original Message----- From: twisted-python-bounces@twistedmatrix.com [mailto:twisted-python-bounces@twistedmatrix.com] On Behalf Of Christopher Armstrong Sent: Monday, October 29, 2007 10:57 AM To: Twisted general discussion Subject: Re: [Twisted-Python] IPushProducer - medium volume streaming On 10/29/07, Rutt, Benjamin <Benjamin.Rutt@gs.com> wrote: the
main reactor loop. i.e. what state does it use to get this right?
"self.transport" is associated with the particular connection. Also, there are a couple of other things wrong about your snippet: 1. use "write", not "writeSomeData". writeSomeData is an internal implementation detail. 2. You're actually calling writeSomeData and passing the *result* of that to callFromThread. That's wrong; you need to pass a callable and its arguments to callFromThread. So instead of reactor.callFromThread(foo(a, b)), you write reactor.callFromThread(foo, a, b). That means you should ultimately be using:: reactor.callFromThread(self.transport.write, data) -- Christopher Armstrong International Man of Twistery http://radix.twistedmatrix.com/ http://twistedmatrix.com/ http://canonical.com/ _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
participants (3)
-
Christopher Armstrong
-
Phil Mayers
-
Rutt, Benjamin