[Twisted-Python] How do you determine the buffer size of a transport - a use-case for not using back pressure
![](https://secure.gravatar.com/avatar/0fee44845a6df3d53a4966b1516b5281.jpg?s=120&d=mm&r=g)
Twisted Community Problem: How do you determine the buffer size of a transport, to know how much data is waiting to be transmitted from using transport.write? Wait! You're going to say: use the Producer Consumer API ( http://twistedmatrix.com/documents/current/core/howto/producers.html ) To do what: So that instead of using back pressure I can check the buffer and when it's "too big/full" can decide to do something to the transport I am writing to: Slow transport handling options: - Buffer to disk instead of memory - Kill the transport - Decide to skip sending some data - Send an error or message to the transport I am writing to - Reduce the resolution, increase the compression (things like video or audio) Why not use back pressure?: Some use-cases and some protocols this doesn't make sense. Examples: - You're sending video and if the receiver can't keep up you want to downgrade or upgrade the quality of the video, but if you don't know if you can't tell how much it buffering. - You're receiving from one connection and then broadcasting what you received to multiple clients and you want to handle it by sending an error to a client that doesn't keep up - You're sending a real-time protocol and want to skip sending some data that's no longer relevant if the buffer is too slow. On a server what are the consequences: Too much buffering in many transport write buffer can cause a server to fail. I don't know how to keep track of this to proactively without access to the buffer sizes. Resolutions can then be to, not accept new connections when memory pressure is too high, kill connections with the weakest/slowest clients or have a protocol that can have client switch connections to new servers when instructed to spread out the load. 1) I would like to hear on how people would solve this sort of problem in Twisted for a server? 2) I would like to hear people opinions on this in general. Tobias Oberstein - BCC'ed you on this email because you seems to have tackled similar problems (based on the mailing list) and would really love to get your take on this too. Glyph and Jean-Paul, you're also big on those threads so any opinions you have would be appreciated as well. Some of my background research * http://twistedmatrix.com/pipermail/twisted-python/2015-January/029064.html * Later but good in the chain: http://twistedmatrix.com/pipermail/twisted-python/2015-January/029071.html * Twisted receiving buffers swamped? * Summary: Great thread but runs into a tangent for a while and picks up good at the end again discussing producer again and the need for backpressure * Other: * http://crossbar.io/ * http://tavendo.com/ * Scenario: "Twisted is reading off the TCP stack from the kernel and buffering in userspace faster than the echo server is pushing out stuff to the TCP stack into the kernel. Hence, no TCP backpressure results, netperf happily sends more and more, and the memory of the Twisted process runs away." * Confirmed: Data isn't Buffered in "userspace inside Twisted, and before data is handled by the app in dataReceived." * https://twistedmatrix.com/pipermail/twisted-python/2010-September/022900.htm... * How to cap the buffering size of data to be sent in Protocol class * Summary: Same issue, very short no good info * https://twistedmatrix.com/pipermail/twisted-python/2012-March/025416.html * Limit on transport.write * Summary: Similar issue, very short no good info, glyph confirms that transport.write buffers everything sent to it. * https://twistedmatrix.com/pipermail/twisted-python/2012-February/025215.html * pushing out same message on 100k TCPs * Summary: Interesting discussion but different issue - interesting aside about irc spanning trees for a large broadcasts * http://twistedmatrix.com/pipermail/twisted-python/2008-April/017503.html * Question on push/pull producers inter-workings, was : "Is there a simple Producer/Consumer example or tutorial?" * Summary: Related and goes into what a producer is - an explanation of it * http://twistedmatrix.com/pipermail/twisted-python/2004-May/007732.html * When do calls to transport.write() block ? * Summary: Discusses the right issue, talks about buffer call back if full which would be great (if configurable) * https://pythonhosted.org/qbuf/qbuf.twisted_support.MultiBufferer-class.html * https://launchpad.net/qbuf * Summary: c buffer implementation that's incomplete thought might be promising * http://stackoverflow.com/questions/21821119/twisted-producer-to-many-clients... * Summary: Was indicating a similar use-case but source doesn't seem to exist on the internet * http://twistedmatrix.com/documents/current/core/howto/producers.html * Summary: Documentation on Producer and Consumers but only help with a backpressure scenario * http://twistedmatrix.com/pipermail/twisted-python/2007-July/015690.html * Summary: Discussion on how producer-consumer API's work and future enhancement no help * MANY MORE THAT 100% NOT RELEVANT -- Steve Morin | Hacker, Entrepreneur, Startup Advisor twitter.com/SteveMorin | stevemorin.com *Live the dream start a startup. Make the world ... a better place.*
![](https://secure.gravatar.com/avatar/52e3a446e514506e371b261ee98716a2.jpg?s=120&d=mm&r=g)
Anyone know how do you determine the buffer size of a transport, to know how much data is waiting to be transmitted from using transport.write? Or how you would go about adding that ability to a reactor/transport? On Wed, Aug 17, 2016 at 3:43 PM, Steve Morin <steve.morin@gmail.com> wrote:
-- *Steve Morin | Managing Partner - CTO* *Nvent* O 800-407-1156 ext 803 <800-407-1156;803> | M 347-453-5579 smorin@nventdata.com <smorin@nventdata.com> *Enabling the Data Driven Enterprise* *(Ask us how we can setup scalable open source realtime billion+ event/data collection/analytics infrastructure in weeks)* Service Areas: Management & Strategy Consulting | Data Engineering | Data Science & Visualization
![](https://secure.gravatar.com/avatar/607cfd4a5b41fe6c886c978128b9c03e.jpg?s=120&d=mm&r=g)
There are at least two buffers (per direction) you might be interested in. You can get the kernel buffer size with getsockopt - SO_SNDBUF and SO_RCVBUF. Then, there may also be a user-space buffer (or perhaps more than one) managed by the transport implementation. The details of this are entirely up to the transport so there's no general answer apart from "read the implementation, contribute a patch implementing a public interface for retrieving the information". Jean-Paul On Fri, Aug 19, 2016 at 10:08 AM, Steve Morin <steve@stevemorin.com> wrote:
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
On 17/08/16 23:43, Steve Morin wrote:
You haven't received many responses, so I'll add a short note here: The ideal way to do this IMO is to have the receiver send acknowledgements with a sequence number to the sender once a message has been received *and processed*. Keep track of these at the sender and adjust the sending rate accordingly. Another complementary approach is for the client to keep track of the length (in space or in time/age) of it's local received-but-not-processed queue and report this. Keeping track of the queue in terms of delay is close to the CoDel approach for AQM. Depending on the network portion of the implementation, the mere fact of a message having been received and TCP-ack'd doesn't tell you the client is keeping up - it might be receiving and buffering into RAM and getting more and more backlogged. If you can't alter the protocol to include protocol-level ack messages then obviously you have to go the less satisfactory route and estimate it from the lower-layer transport. As JP has noted, you can get this from the kernel. Flow control is hard ;o)
![](https://secure.gravatar.com/avatar/e1554622707bedd9202884900430b838.jpg?s=120&d=mm&r=g)
Hi Steve, It looks like I had marked this message as interesting and warranting a reply, but never got around to it. I'm sorry it's been quite a while! I appreciate the amount of research you did here :-).
This is, unfortunately, the only solution :).
To do what: So that instead of using back pressure I can check the buffer and when it's "too big/full" can decide to do something to the transport I am writing to:
I think when you say "back pressure" you're referring to your program exerting back-pressure on its peer. I understand why you don't want to do that. However, there's another kind of back pressure - your peer exerting back pressure on your program. Commensurately, there are two ways to use back pressure: To exert back pressure on your peer, call `self.transport.pauseProducing()`. Later, when you're ready to receive more data, call `self.transport.resumeProducing()`. This is what you don't want to do. To detect when back pressure is applied from your peer, call `self.transport.registerProducer(self, True)`; then the reactor will call pauseProducing() when its buffer is full and and resumeProducing() when it empties out again. Your list of things you might want to do here:
is a good one, and all these things can be achieved. Going through them: If you want to buffer to disk instead of memory, have a method like: def someDataToSendToPeer(self, someData): if self.isProducing: self.transport.write(someData) else: self.bufferFile.write(someData) def pauseProducing(self): self.isProducing = False self.bufferFile = open("buffer.file", "wb") def resumeProducing(self): self.isProducing = True self.startUbufferingFromFile() If you want to kill the transport, def pauseProducing(self): self.transport.abortConnection() If you want to reduce video stream quality, def streamSomeRawVideo(self, someRawVideo): if self.isProducing: self.transport.write(self.videoBuffer.addAndEncodeToBytes(someRawVideo)) else: self.videoBuffer.addAndCompressSomeMore(someRawVideo) and so on, and so on. Basically, you can treat the buffer as "empty" until pauseProducing() is called. Once it is, you can treat it as "full". Hope this was helpful, and still timely enough for you to make some use of it :). -glyph
![](https://secure.gravatar.com/avatar/52e3a446e514506e371b261ee98716a2.jpg?s=120&d=mm&r=g)
Anyone know how do you determine the buffer size of a transport, to know how much data is waiting to be transmitted from using transport.write? Or how you would go about adding that ability to a reactor/transport? On Wed, Aug 17, 2016 at 3:43 PM, Steve Morin <steve.morin@gmail.com> wrote:
-- *Steve Morin | Managing Partner - CTO* *Nvent* O 800-407-1156 ext 803 <800-407-1156;803> | M 347-453-5579 smorin@nventdata.com <smorin@nventdata.com> *Enabling the Data Driven Enterprise* *(Ask us how we can setup scalable open source realtime billion+ event/data collection/analytics infrastructure in weeks)* Service Areas: Management & Strategy Consulting | Data Engineering | Data Science & Visualization
![](https://secure.gravatar.com/avatar/607cfd4a5b41fe6c886c978128b9c03e.jpg?s=120&d=mm&r=g)
There are at least two buffers (per direction) you might be interested in. You can get the kernel buffer size with getsockopt - SO_SNDBUF and SO_RCVBUF. Then, there may also be a user-space buffer (or perhaps more than one) managed by the transport implementation. The details of this are entirely up to the transport so there's no general answer apart from "read the implementation, contribute a patch implementing a public interface for retrieving the information". Jean-Paul On Fri, Aug 19, 2016 at 10:08 AM, Steve Morin <steve@stevemorin.com> wrote:
![](https://secure.gravatar.com/avatar/426d6dbf6554a9b3fca1fd04e6b75f38.jpg?s=120&d=mm&r=g)
On 17/08/16 23:43, Steve Morin wrote:
You haven't received many responses, so I'll add a short note here: The ideal way to do this IMO is to have the receiver send acknowledgements with a sequence number to the sender once a message has been received *and processed*. Keep track of these at the sender and adjust the sending rate accordingly. Another complementary approach is for the client to keep track of the length (in space or in time/age) of it's local received-but-not-processed queue and report this. Keeping track of the queue in terms of delay is close to the CoDel approach for AQM. Depending on the network portion of the implementation, the mere fact of a message having been received and TCP-ack'd doesn't tell you the client is keeping up - it might be receiving and buffering into RAM and getting more and more backlogged. If you can't alter the protocol to include protocol-level ack messages then obviously you have to go the less satisfactory route and estimate it from the lower-layer transport. As JP has noted, you can get this from the kernel. Flow control is hard ;o)
![](https://secure.gravatar.com/avatar/e1554622707bedd9202884900430b838.jpg?s=120&d=mm&r=g)
Hi Steve, It looks like I had marked this message as interesting and warranting a reply, but never got around to it. I'm sorry it's been quite a while! I appreciate the amount of research you did here :-).
This is, unfortunately, the only solution :).
To do what: So that instead of using back pressure I can check the buffer and when it's "too big/full" can decide to do something to the transport I am writing to:
I think when you say "back pressure" you're referring to your program exerting back-pressure on its peer. I understand why you don't want to do that. However, there's another kind of back pressure - your peer exerting back pressure on your program. Commensurately, there are two ways to use back pressure: To exert back pressure on your peer, call `self.transport.pauseProducing()`. Later, when you're ready to receive more data, call `self.transport.resumeProducing()`. This is what you don't want to do. To detect when back pressure is applied from your peer, call `self.transport.registerProducer(self, True)`; then the reactor will call pauseProducing() when its buffer is full and and resumeProducing() when it empties out again. Your list of things you might want to do here:
is a good one, and all these things can be achieved. Going through them: If you want to buffer to disk instead of memory, have a method like: def someDataToSendToPeer(self, someData): if self.isProducing: self.transport.write(someData) else: self.bufferFile.write(someData) def pauseProducing(self): self.isProducing = False self.bufferFile = open("buffer.file", "wb") def resumeProducing(self): self.isProducing = True self.startUbufferingFromFile() If you want to kill the transport, def pauseProducing(self): self.transport.abortConnection() If you want to reduce video stream quality, def streamSomeRawVideo(self, someRawVideo): if self.isProducing: self.transport.write(self.videoBuffer.addAndEncodeToBytes(someRawVideo)) else: self.videoBuffer.addAndCompressSomeMore(someRawVideo) and so on, and so on. Basically, you can treat the buffer as "empty" until pauseProducing() is called. Once it is, you can treat it as "full". Hope this was helpful, and still timely enough for you to make some use of it :). -glyph
participants (5)
-
Glyph Lefkowitz
-
Jean-Paul Calderone
-
Phil Mayers
-
Steve Morin
-
Steve Morin