[Twisted-Python] how to stop dataReceived while waiting a deferred to fire?
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
My code is like this: def stringReceived(string): d = somesqlquery(string).addCallback(somecallback) Now before I can accept more data from the next stringReceived, I have to validate the "string" with the "somecallback". I don't want to read more data from the kernel _socket_ until I've the result of somecallback. The example in the docs is like this: ------------------------- class FingerProtocol(basic.LineReceiver): def lineReceived(self, user): self.factory.getUser(user ).addErrback(lambda _: "Internal error in server" ).addCallback(lambda m: (self.transport.write(m+"\r\n"), self.transport.loseConnection())) [..] we would have to fetch result from a remote Oracle? Or from the web? ------------------------- But as far as I can tell the above also is broken and it won't wait for the deferred to fire, so two of those queries could be invoked if the client side is malicious and it sends two lines in one packet and that could screwup the server. It'll work only as long as the clients aren't malicious. I really can't see how the reactor could ever wait for the deferred returned by GetUser and how could lineReceived being throttled in the above example. So I think the above example needs fixing too. I need one of those below two solutions: 1) something like protocol.transport.removeReader() to call before returning from stringReceived (this can be allowed to be different for stringReceived and dataReceived, since stringReceived will have to stop the already read data too, and in turn for stringReceived it could be a protocol.removeReader() that will call protocol.transport.removeReader internally) 2) I could return a deferred from stringReceived and have the reactor deregister the fd corresponding to the transport that returned a deferred until the deferred fires, this is a less finegrined solution but it's quite handy to code with and it would match the Nevow style of waiting deferreds I could try to workaround in my app by stacking some local buffering with removeReader in the reactor, but it's probably simpler to fix twisted for it instead. There must be a way to throttle the input at the kernel level, the recv syscall must not be called while the deferred is outstanding. Please help thanks, I'm stuck on this right now...
![](https://secure.gravatar.com/avatar/15fa47f2847592672210af8a25cd1f34.jpg?s=120&d=mm&r=g)
On Feb 20, 2005, at 11:50 PM, Andrea Arcangeli wrote:
I assume you're using NetstringReceiver? You want to implement the Producer interface: pauseProducing/resumeProducing/stopProducing, similar to how LineReceiver implements it. (That is, submit a patch to NetstringReceiver implementing it). LineReceiver's pauseProducing does exactly what you want: 1) stops the socket from being read more and 2) doesn't call lineReceived any more even if there are more lines already buffered. James
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 12:08:43AM -0500, James Y Knight wrote:
I didn't know about this linereceiver feature, very nice. I'm using int32stringreceiver which misses it like netstringreceiver (I was definitely sure something was missing in twisted after reading all the code of int32stringreceiver), so I'll try to implement it following the linereceiver example. Many thanks to get me on the right track!
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 04:33:32PM +0100, Andrea Arcangeli wrote:
code of int32stringreceiver), so I'll try to implement it following the linereceiver example.
Ok here we go, I even found a severe bug in the linereceiver code and I fixed it as well. The bug in linereceiver happens like this: resumeProducing() calls self.transport.resumeProducing calls self.dataReceived calls self.stringReceived calls self.stopProducing calls self.transport.stopProducing runs some sql query again calls self.transport.resumeProducing <- BUG, invalidated the above stopProducing The above bug would for sure screwup two sql queries in a raw (i.e. the deferred callback executing another sql query). Fix is the below one liner, it's enough to call dataReceived as the last thing in resumeProducing (I assume for the lower layer the effect of self.transport.resumeProducing; self.transport.stopProducing will be a noop as long as we never schedule in between, i.e. never return in between) Please checkin the below into trunk. My testcase now passes perfectly (I'll leave the debug code live so that if something goes wrong I get an exception and the client is disconnected). I was too lazy to implement for netstringreceiver (to me netstring receiver looks mostly a waste of bandwidth and an unnecessary complexity ;). I guess netstring recevier is more ngrep friendy, but I can't debug with ngrep anyway since it's all under ssl with certificate validation here. But at least now the reentrancy bug is fixed and it should be easy to extend to nestringreceiver if needed. Thanks for the help! Index: Twisted/twisted/protocols/basic.py =================================================================== --- Twisted/twisted/protocols/basic.py (revision 13001) +++ Twisted/twisted/protocols/basic.py (working copy) @@ -166,9 +166,23 @@ """ return error.ConnectionLost('Line length exceeded') +class PauseProducer(object): + paused = False + def pauseProducing(self): + self.paused = True + self.transport.pauseProducing() -class LineReceiver(protocol.Protocol): + def resumeProducing(self): + self.paused = False + self.transport.resumeProducing() + self.dataReceived('') + + def stopProducing(self): + self.paused = True + self.transport.stopProducing() + +class LineReceiver(protocol.Protocol, PauseProducer): """A protocol that receives lines and/or raw data, depending on mode. In line mode, each line that's received becomes a callback to @@ -188,7 +202,6 @@ __buffer = '' delimiter = '\r\n' MAX_LENGTH = 16384 - paused = False def clearLineBuffer(self): """Clear buffered data.""" @@ -279,21 +292,8 @@ """ return self.transport.loseConnection() - def pauseProducing(self): - self.paused = True - self.transport.pauseProducing() - def resumeProducing(self): - self.paused = False - self.dataReceived('') - self.transport.resumeProducing() - - def stopProducing(self): - self.paused = True - self.transport.stopProducing() - - -class Int32StringReceiver(protocol.Protocol): +class Int32StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int32-prefixed strings. An int32 string is a string prefixed by 4 bytes, the 32-bit length of @@ -314,7 +314,7 @@ """Convert int32 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 3: + while len(self.recvd) > 3 and not self.paused: length ,= struct.unpack("!i",self.recvd[:4]) if length > self.MAX_LENGTH: self.transport.loseConnection() @@ -331,7 +331,7 @@ self.transport.write(struct.pack("!i",len(data))+data) -class Int16StringReceiver(protocol.Protocol): +class Int16StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int16-prefixed strings. An int16 string is a string prefixed by 2 bytes, the 16-bit length of @@ -351,7 +351,7 @@ """Convert int16 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 1: + while len(self.recvd) > 1 and not self.paused: length = (ord(self.recvd[0]) * 256) + ord(self.recvd[1]) if len(self.recvd) < length+2: break
![](https://secure.gravatar.com/avatar/d6328babd9f9a98ecc905e1ccac2495e.jpg?s=120&d=mm&r=g)
On Mon, 21 Feb 2005 18:46:18 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
Ok here we go, I even found a severe bug in the linereceiver code and I fixed it as well. The bug in linereceiver happens like this:
Any chance that you could include some unit tests for the behavior you're expecting? I think it might need to be fixed a different way. For example:
This seems like it could lead to some unpleasantly large (i.e. quadratic-time concatenation) buffers strings being left around if you pause one of these receivers and then forget to unpause it, or even just if your peer happens to be sending data very rapidly.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 08:47:57PM +0000, glyph@divmod.com wrote:
From the client send a flood of lines, and from the server call
Using the echoserver/client as base should be good enough to test this. pauseProducing() after the first line has been received. Then verify the server doesn't receive the other lines and that they sit in the tcp recv queue.
Won't the data be freed along with the protocol when the connection is closed? The protocol should go away when the connection is closed.
just if your peer happens to be sending data very rapidly.
The high bound of in flight receive data for each int32 protocol is set to 99999 (protocol.MAX_LENGTH), and the reactor will read with a limited buffer too. So I don't see any problem as long as transport.pauseProducing() does what's expected to do (i.e. to stop polling the socket and to stop receiving data from the wire). Anyway if there's still a problem, let's ignore my problem, and let's only focus on the LineReceiver implementation that is already in current twisted. As long as LineReceiver works my patch is going to be fine. There was a reentrancy bug in LineReceiver but I already fixed it.
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Mon, 21 Feb 2005 22:36:44 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
That's not a unit test. Unit tests are described here: http://c2.com/cgi/wiki?UnitTest Jp
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Tue, Feb 22, 2005 at 03:33:22AM +0000, Jp Calderone wrote:
That's not a unit test. Unit tests are described here:
I've no time to write it sorry.
![](https://secure.gravatar.com/avatar/b3407ff6ccd34c6e7c7a9fdcfba67a45.jpg?s=120&d=mm&r=g)
On Tue, Feb 22, 2005 at 04:48:43AM +0100, Andrea Arcangeli wrote:
I've filed http://twistedmatrix.com/bugs/issue899 so this issue doesn't get forgotten. -Andrew.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Tue, Feb 22, 2005 at 03:35:40PM +1100, Andrew Bennetts wrote:
I've filed http://twistedmatrix.com/bugs/issue899 so this issue doesn't get forgotten.
Thanks a lot! What the pauseProducing does is fine for my objective of stopping the client and to restart it from both errback/callback, but what it does is possibly too aggressive. On the lines of glyph's email, sometime what could be needed is an option to _only_ mask the POLLIN, not necessairly all other bitflags, so that I could still receive a disconnect event. In some ways it might be good not to get a disconnect during a deferred, but OTOH the fact it doesn't notice a disconnect, prevents to leave the pauseProducing enabled for an extended period of time, while I could really do that in theory (I can't do that or I would lose the disconnect event with pauseProducing). But overall this would be an additional (minor) feature, quite orthogonal with the "hard" approach of pauseProducing. And having the API of int32/int16 in sync with linereceiver sounds a better approach for the short term, plus the reentrancy bugfix sounds good to have. Thanks everyone for the help and sorry for not having much time to provide unit test.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
This patch adds a check for transport.disconnecting to the int16/32 protocols, so that they don't keep firing stringReceived callbacks after calling transport.loseConnection (in line with the linereceiver protocol). This also adds the methods for pauseproducer in line with the linereceiver protocol (as per previous patch in this thread). If these two fixes to int32/16 are correct please apply to SVN. Thanks. Index: Twisted/twisted/protocols/basic.py =================================================================== --- Twisted/twisted/protocols/basic.py (revision 13177) +++ Twisted/twisted/protocols/basic.py (working copy) @@ -166,9 +166,23 @@ """ return error.ConnectionLost('Line length exceeded') +class PauseProducer(object): + paused = False + def pauseProducing(self): + self.paused = True + self.transport.pauseProducing() -class LineReceiver(protocol.Protocol): + def resumeProducing(self): + self.paused = False + self.transport.resumeProducing() + self.dataReceived('') + + def stopProducing(self): + self.paused = True + self.transport.stopProducing() + +class LineReceiver(protocol.Protocol, PauseProducer): """A protocol that receives lines and/or raw data, depending on mode. In line mode, each line that's received becomes a callback to @@ -188,7 +202,6 @@ __buffer = '' delimiter = '\r\n' MAX_LENGTH = 16384 - paused = False def clearLineBuffer(self): """Clear buffered data.""" @@ -279,21 +292,8 @@ """ return self.transport.loseConnection() - def pauseProducing(self): - self.paused = True - self.transport.pauseProducing() - def resumeProducing(self): - self.paused = False - self.dataReceived('') - self.transport.resumeProducing() - - def stopProducing(self): - self.paused = True - self.transport.stopProducing() - - -class Int32StringReceiver(protocol.Protocol): +class Int32StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int32-prefixed strings. An int32 string is a string prefixed by 4 bytes, the 32-bit length of @@ -314,7 +314,7 @@ """Convert int32 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 3: + while len(self.recvd) > 3 and not self.paused and not self.transport.disconnecting: length ,= struct.unpack("!i",self.recvd[:4]) if length > self.MAX_LENGTH: self.transport.loseConnection() @@ -331,7 +331,7 @@ self.transport.write(struct.pack("!i",len(data))+data) -class Int16StringReceiver(protocol.Protocol): +class Int16StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int16-prefixed strings. An int16 string is a string prefixed by 2 bytes, the 16-bit length of @@ -351,7 +351,7 @@ """Convert int16 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 1: + while len(self.recvd) > 1 and not self.paused and not self.transport.disconnecting: length = (ord(self.recvd[0]) * 256) + ord(self.recvd[1]) if len(self.recvd) < length+2: break
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Sun, 13 Mar 2005 20:57:09 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
transport.disconnecting is horrible (it's an implementation detail, it doesn't work on a lot of transports). LineReceiver shouldn't be using it. The feature LineReceiver uses it to implement may be reasonable, though, which is why its code is still how it is. If the feature is desired somewhere else, the best way to add it is to either find another way that doesn't rely on transport.disconnecting (and then we can fix LineReceiver too), or factor the code such that all protocols can provide the feature without having more than one method that relies on transport.disconnecting (then we can fix them all at once someday). Jp
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Mar 14, 2005 at 12:58:47PM +0000, Jp Calderone wrote:
If there's no real generic API and you're not planning on adding it soon, perhaps we could check it with hasattr and spawn a warning if the transport doens't provide .disconnecting?
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Tue, Mar 15, 2005 at 12:29:35PM -0500, Itamar Shtull-Trauring wrote:
We probably ought to add an API, since the functionality appears to be useful.
What about just adding .disconnecting to the other transports too then? That already exists and I don't see anything fundamentally wrong with it (especially with the property feature of python that won't leave us locked into a fixed value). Then incidentally my patch will become correct too ;).
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Tue, 15 Mar 2005 19:14:52 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
The right way would be to add a 'state' attribute to all transports and have one possible value of it be 'disconnecting' (or DISCONNECTING, whatever ;). The disconnecting attribute is problematic as a symptom of a larger internal problem (a lack of clarity as to the possible states and state transitions a transport can be in and go through). Jp
![](https://secure.gravatar.com/avatar/b3407ff6ccd34c6e7c7a9fdcfba67a45.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 05:50:16AM +0100, Andrea Arcangeli wrote: [...]
I don't think it matters if the two lines arrive in the same packet or not; if the lines arrive before the getUser deferred fires, it will fire multiple queries, regardless of the exact packets involved. In this case maybe a flag should be in the protocol after the first lineReceived event... any future rawDataReceived events (after call self.setRawMode()) could then be ignored. Or if you're more paranoid, it could drop the connection and try cancel the in-progress getUser operation.
I think the transport's pauseProducing call will do what you want, here. And resumeProducing when you're ready again. -Andrew.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 04:13:53PM +1100, Andrew Bennetts wrote:
Well, it will raise an exception in transport.write at the very least, in theory unhandled exceptions shouldn't happen. But the reason I suggested to fix it, it to get people get the right mindset, unless they know what they're doing, they should always call LineReceiver.pauseProducing before executing the getUser(user), otherwise a _real_ app that copied the above code, may get wrong for more than just an exception.
I think the transport's pauseProducing call will do what you want, here. And resumeProducing when you're ready again.
Yes, if I were using a unbuffered input. Thanks a lot for the help!
![](https://secure.gravatar.com/avatar/15fa47f2847592672210af8a25cd1f34.jpg?s=120&d=mm&r=g)
On Feb 20, 2005, at 11:50 PM, Andrea Arcangeli wrote:
I assume you're using NetstringReceiver? You want to implement the Producer interface: pauseProducing/resumeProducing/stopProducing, similar to how LineReceiver implements it. (That is, submit a patch to NetstringReceiver implementing it). LineReceiver's pauseProducing does exactly what you want: 1) stops the socket from being read more and 2) doesn't call lineReceived any more even if there are more lines already buffered. James
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 12:08:43AM -0500, James Y Knight wrote:
I didn't know about this linereceiver feature, very nice. I'm using int32stringreceiver which misses it like netstringreceiver (I was definitely sure something was missing in twisted after reading all the code of int32stringreceiver), so I'll try to implement it following the linereceiver example. Many thanks to get me on the right track!
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 04:33:32PM +0100, Andrea Arcangeli wrote:
code of int32stringreceiver), so I'll try to implement it following the linereceiver example.
Ok here we go, I even found a severe bug in the linereceiver code and I fixed it as well. The bug in linereceiver happens like this: resumeProducing() calls self.transport.resumeProducing calls self.dataReceived calls self.stringReceived calls self.stopProducing calls self.transport.stopProducing runs some sql query again calls self.transport.resumeProducing <- BUG, invalidated the above stopProducing The above bug would for sure screwup two sql queries in a raw (i.e. the deferred callback executing another sql query). Fix is the below one liner, it's enough to call dataReceived as the last thing in resumeProducing (I assume for the lower layer the effect of self.transport.resumeProducing; self.transport.stopProducing will be a noop as long as we never schedule in between, i.e. never return in between) Please checkin the below into trunk. My testcase now passes perfectly (I'll leave the debug code live so that if something goes wrong I get an exception and the client is disconnected). I was too lazy to implement for netstringreceiver (to me netstring receiver looks mostly a waste of bandwidth and an unnecessary complexity ;). I guess netstring recevier is more ngrep friendy, but I can't debug with ngrep anyway since it's all under ssl with certificate validation here. But at least now the reentrancy bug is fixed and it should be easy to extend to nestringreceiver if needed. Thanks for the help! Index: Twisted/twisted/protocols/basic.py =================================================================== --- Twisted/twisted/protocols/basic.py (revision 13001) +++ Twisted/twisted/protocols/basic.py (working copy) @@ -166,9 +166,23 @@ """ return error.ConnectionLost('Line length exceeded') +class PauseProducer(object): + paused = False + def pauseProducing(self): + self.paused = True + self.transport.pauseProducing() -class LineReceiver(protocol.Protocol): + def resumeProducing(self): + self.paused = False + self.transport.resumeProducing() + self.dataReceived('') + + def stopProducing(self): + self.paused = True + self.transport.stopProducing() + +class LineReceiver(protocol.Protocol, PauseProducer): """A protocol that receives lines and/or raw data, depending on mode. In line mode, each line that's received becomes a callback to @@ -188,7 +202,6 @@ __buffer = '' delimiter = '\r\n' MAX_LENGTH = 16384 - paused = False def clearLineBuffer(self): """Clear buffered data.""" @@ -279,21 +292,8 @@ """ return self.transport.loseConnection() - def pauseProducing(self): - self.paused = True - self.transport.pauseProducing() - def resumeProducing(self): - self.paused = False - self.dataReceived('') - self.transport.resumeProducing() - - def stopProducing(self): - self.paused = True - self.transport.stopProducing() - - -class Int32StringReceiver(protocol.Protocol): +class Int32StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int32-prefixed strings. An int32 string is a string prefixed by 4 bytes, the 32-bit length of @@ -314,7 +314,7 @@ """Convert int32 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 3: + while len(self.recvd) > 3 and not self.paused: length ,= struct.unpack("!i",self.recvd[:4]) if length > self.MAX_LENGTH: self.transport.loseConnection() @@ -331,7 +331,7 @@ self.transport.write(struct.pack("!i",len(data))+data) -class Int16StringReceiver(protocol.Protocol): +class Int16StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int16-prefixed strings. An int16 string is a string prefixed by 2 bytes, the 16-bit length of @@ -351,7 +351,7 @@ """Convert int16 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 1: + while len(self.recvd) > 1 and not self.paused: length = (ord(self.recvd[0]) * 256) + ord(self.recvd[1]) if len(self.recvd) < length+2: break
![](https://secure.gravatar.com/avatar/d6328babd9f9a98ecc905e1ccac2495e.jpg?s=120&d=mm&r=g)
On Mon, 21 Feb 2005 18:46:18 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
Ok here we go, I even found a severe bug in the linereceiver code and I fixed it as well. The bug in linereceiver happens like this:
Any chance that you could include some unit tests for the behavior you're expecting? I think it might need to be fixed a different way. For example:
This seems like it could lead to some unpleasantly large (i.e. quadratic-time concatenation) buffers strings being left around if you pause one of these receivers and then forget to unpause it, or even just if your peer happens to be sending data very rapidly.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 08:47:57PM +0000, glyph@divmod.com wrote:
From the client send a flood of lines, and from the server call
Using the echoserver/client as base should be good enough to test this. pauseProducing() after the first line has been received. Then verify the server doesn't receive the other lines and that they sit in the tcp recv queue.
Won't the data be freed along with the protocol when the connection is closed? The protocol should go away when the connection is closed.
just if your peer happens to be sending data very rapidly.
The high bound of in flight receive data for each int32 protocol is set to 99999 (protocol.MAX_LENGTH), and the reactor will read with a limited buffer too. So I don't see any problem as long as transport.pauseProducing() does what's expected to do (i.e. to stop polling the socket and to stop receiving data from the wire). Anyway if there's still a problem, let's ignore my problem, and let's only focus on the LineReceiver implementation that is already in current twisted. As long as LineReceiver works my patch is going to be fine. There was a reentrancy bug in LineReceiver but I already fixed it.
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Mon, 21 Feb 2005 22:36:44 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
That's not a unit test. Unit tests are described here: http://c2.com/cgi/wiki?UnitTest Jp
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Tue, Feb 22, 2005 at 03:33:22AM +0000, Jp Calderone wrote:
That's not a unit test. Unit tests are described here:
I've no time to write it sorry.
![](https://secure.gravatar.com/avatar/b3407ff6ccd34c6e7c7a9fdcfba67a45.jpg?s=120&d=mm&r=g)
On Tue, Feb 22, 2005 at 04:48:43AM +0100, Andrea Arcangeli wrote:
I've filed http://twistedmatrix.com/bugs/issue899 so this issue doesn't get forgotten. -Andrew.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Tue, Feb 22, 2005 at 03:35:40PM +1100, Andrew Bennetts wrote:
I've filed http://twistedmatrix.com/bugs/issue899 so this issue doesn't get forgotten.
Thanks a lot! What the pauseProducing does is fine for my objective of stopping the client and to restart it from both errback/callback, but what it does is possibly too aggressive. On the lines of glyph's email, sometime what could be needed is an option to _only_ mask the POLLIN, not necessairly all other bitflags, so that I could still receive a disconnect event. In some ways it might be good not to get a disconnect during a deferred, but OTOH the fact it doesn't notice a disconnect, prevents to leave the pauseProducing enabled for an extended period of time, while I could really do that in theory (I can't do that or I would lose the disconnect event with pauseProducing). But overall this would be an additional (minor) feature, quite orthogonal with the "hard" approach of pauseProducing. And having the API of int32/int16 in sync with linereceiver sounds a better approach for the short term, plus the reentrancy bugfix sounds good to have. Thanks everyone for the help and sorry for not having much time to provide unit test.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
This patch adds a check for transport.disconnecting to the int16/32 protocols, so that they don't keep firing stringReceived callbacks after calling transport.loseConnection (in line with the linereceiver protocol). This also adds the methods for pauseproducer in line with the linereceiver protocol (as per previous patch in this thread). If these two fixes to int32/16 are correct please apply to SVN. Thanks. Index: Twisted/twisted/protocols/basic.py =================================================================== --- Twisted/twisted/protocols/basic.py (revision 13177) +++ Twisted/twisted/protocols/basic.py (working copy) @@ -166,9 +166,23 @@ """ return error.ConnectionLost('Line length exceeded') +class PauseProducer(object): + paused = False + def pauseProducing(self): + self.paused = True + self.transport.pauseProducing() -class LineReceiver(protocol.Protocol): + def resumeProducing(self): + self.paused = False + self.transport.resumeProducing() + self.dataReceived('') + + def stopProducing(self): + self.paused = True + self.transport.stopProducing() + +class LineReceiver(protocol.Protocol, PauseProducer): """A protocol that receives lines and/or raw data, depending on mode. In line mode, each line that's received becomes a callback to @@ -188,7 +202,6 @@ __buffer = '' delimiter = '\r\n' MAX_LENGTH = 16384 - paused = False def clearLineBuffer(self): """Clear buffered data.""" @@ -279,21 +292,8 @@ """ return self.transport.loseConnection() - def pauseProducing(self): - self.paused = True - self.transport.pauseProducing() - def resumeProducing(self): - self.paused = False - self.dataReceived('') - self.transport.resumeProducing() - - def stopProducing(self): - self.paused = True - self.transport.stopProducing() - - -class Int32StringReceiver(protocol.Protocol): +class Int32StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int32-prefixed strings. An int32 string is a string prefixed by 4 bytes, the 32-bit length of @@ -314,7 +314,7 @@ """Convert int32 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 3: + while len(self.recvd) > 3 and not self.paused and not self.transport.disconnecting: length ,= struct.unpack("!i",self.recvd[:4]) if length > self.MAX_LENGTH: self.transport.loseConnection() @@ -331,7 +331,7 @@ self.transport.write(struct.pack("!i",len(data))+data) -class Int16StringReceiver(protocol.Protocol): +class Int16StringReceiver(protocol.Protocol, PauseProducer): """A receiver for int16-prefixed strings. An int16 string is a string prefixed by 2 bytes, the 16-bit length of @@ -351,7 +351,7 @@ """Convert int16 prefixed strings into calls to stringReceived. """ self.recvd = self.recvd + recd - while len(self.recvd) > 1: + while len(self.recvd) > 1 and not self.paused and not self.transport.disconnecting: length = (ord(self.recvd[0]) * 256) + ord(self.recvd[1]) if len(self.recvd) < length+2: break
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Sun, 13 Mar 2005 20:57:09 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
transport.disconnecting is horrible (it's an implementation detail, it doesn't work on a lot of transports). LineReceiver shouldn't be using it. The feature LineReceiver uses it to implement may be reasonable, though, which is why its code is still how it is. If the feature is desired somewhere else, the best way to add it is to either find another way that doesn't rely on transport.disconnecting (and then we can fix LineReceiver too), or factor the code such that all protocols can provide the feature without having more than one method that relies on transport.disconnecting (then we can fix them all at once someday). Jp
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Mar 14, 2005 at 12:58:47PM +0000, Jp Calderone wrote:
If there's no real generic API and you're not planning on adding it soon, perhaps we could check it with hasattr and spawn a warning if the transport doens't provide .disconnecting?
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Tue, Mar 15, 2005 at 12:29:35PM -0500, Itamar Shtull-Trauring wrote:
We probably ought to add an API, since the functionality appears to be useful.
What about just adding .disconnecting to the other transports too then? That already exists and I don't see anything fundamentally wrong with it (especially with the property feature of python that won't leave us locked into a fixed value). Then incidentally my patch will become correct too ;).
![](https://secure.gravatar.com/avatar/7ed9784cbb1ba1ef75454034b3a8e6a1.jpg?s=120&d=mm&r=g)
On Tue, 15 Mar 2005 19:14:52 +0100, Andrea Arcangeli <andrea@cpushare.com> wrote:
The right way would be to add a 'state' attribute to all transports and have one possible value of it be 'disconnecting' (or DISCONNECTING, whatever ;). The disconnecting attribute is problematic as a symptom of a larger internal problem (a lack of clarity as to the possible states and state transitions a transport can be in and go through). Jp
![](https://secure.gravatar.com/avatar/b3407ff6ccd34c6e7c7a9fdcfba67a45.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 05:50:16AM +0100, Andrea Arcangeli wrote: [...]
I don't think it matters if the two lines arrive in the same packet or not; if the lines arrive before the getUser deferred fires, it will fire multiple queries, regardless of the exact packets involved. In this case maybe a flag should be in the protocol after the first lineReceived event... any future rawDataReceived events (after call self.setRawMode()) could then be ignored. Or if you're more paranoid, it could drop the connection and try cancel the in-progress getUser operation.
I think the transport's pauseProducing call will do what you want, here. And resumeProducing when you're ready again. -Andrew.
![](https://secure.gravatar.com/avatar/163d3162570cdfe97ccb911a82a44ac9.jpg?s=120&d=mm&r=g)
On Mon, Feb 21, 2005 at 04:13:53PM +1100, Andrew Bennetts wrote:
Well, it will raise an exception in transport.write at the very least, in theory unhandled exceptions shouldn't happen. But the reason I suggested to fix it, it to get people get the right mindset, unless they know what they're doing, they should always call LineReceiver.pauseProducing before executing the getUser(user), otherwise a _real_ app that copied the above code, may get wrong for more than just an exception.
I think the transport's pauseProducing call will do what you want, here. And resumeProducing when you're ready again.
Yes, if I were using a unbuffered input. Thanks a lot for the help!
participants (6)
-
Andrea Arcangeli
-
Andrew Bennetts
-
glyph@divmod.com
-
Itamar Shtull-Trauring
-
James Y Knight
-
Jp Calderone