[Twisted-Python] Looking for some advice debugging issue with transport.write calls
Summary: I am having an issue writing data from an object based on twisted's LineReceiver. Calling self.transport.write from the protocol, some data makes it through, some does not. Using tcpdump, I am not seeing the missing data cross the network interface. Ubuntu LTS 14.04 Python 2.7.6 Twisted 15.0.0 The app recieves a bunch of messages from various upstream servers (not mine) and either discards—or bends, folds, spindles and muilates and sends the modified message to a downstream server. The message format is text-based, proprietary and not mine. The way it works: 1. The app puts messages, consisting of a tuple of two strings: an id and the message data, on a queue. The queue is actually a deque on the factory. 2. There's one client protocol instance for sending, it holds a connection to downstream server. The factory is a reconnecting one, but only if the connection drops. (I'm not creating a new protocol instance/connection per message) 3. There's a method on the protocol that checks for messages in the factory's queue, and sends them, one at a time. It waits for a short ack text message back from the receiving end, or times out (my app's timeout, not a network stack one) before sending the next message, if any. Code for the sending piece is below. 4. The downstream end (an app not under my control) is not getting all the messages. If my timeout check runs, and there's not been a returned ack message yet, invariably the sysadmins for the downstream system report they never got that message. def check_for_send(self): if not self.in_send_message: try: self.in_send_message = True msg = self.factory.queue.popleft() try: self.waiting_id = msg[0] log.msg("Sending Message: {0}".format(msg[0])) raw_msg = "".join([self.MSG_START, msg[1], self.MSG_END, self.delimiter]) threads.deferToThread(self._dump_raw_message, msg[0], raw_msg) self.transport.write(raw_msg) reactor.callLater(30, self.check_for_timeout, msg[0]) except: log.msg("Error sending Message: {0}".format(msg[0])) log.err() self.factory.add_message(msg) self.waiting_id = "" self.in_send_message = False except IndexError: self.in_send_message = False finally: reactor.callLater(0, self.check_for_send) else: reactor.callLater(1, self.check_for_send) Mostly looking for some advice on further debugging this, as whenever I've called self.transport.write on a lineReceiver over tcp before, it "just worked." Observations. 1. The event loop doesn't appear to be getting stuck. Messages that don't arrive downstream are interleaved with those that do. And the other part of the app that is receiving messages from upstream keeps right on trucking. 2. I am now writing copies of the messages to disk, that call is right before the transport.write call, and all the "missing" messages do end up on disk. 3. I also fire the callLater for the timeout function on each message. The disk write and the callLater both happen for missing messages. 4. I ran tcpdump on the interface for about 30 minutes and matched up with log statements. If I see my protocol timed out waiting for an ack message in my app's log, I can't find the outgoing message on the interface. 5. I am not seeing any errors in my logs.
On Apr 27, 2015, at 1:05 PM, Brian Costlow <brian.costlow@gmail.com> wrote:
Summary: I am having an issue writing data from an object based on twisted's LineReceiver. Calling self.transport.write from the protocol, some data makes it through, some does not. Using tcpdump, I am not seeing the missing data cross the network interface.
Nothing strikes me as obviously wrong about this code (except the "deferToThread" which seems slightly suspicious, since nothing in the example appears to have anything to do with threads, and whenever you get threads involved things get complicated). This seems to be a hugely complex example though, full of interactions with other systems, and nothing resembling a minimal example which demonstrates the behavior. I hope someone else has better luck diagnosing it, but it looks like your problem is well outside of Twisted, and you are unlikely to get an answer here unless you can provide more relevant details. I appreciate that you tried to provide lots of diagnostic information in your question, but in the absence of a program I can run, I can't tell if most of it is important or not :-\. Sending data with transport.write should, as you say, "just work". Sorry I couldn't be more helpful, and good luck, -glyph
self.in_send_message seems like a potential source of bugs (can't see it being set to False on all branches here) and is also likely unnecessary. General advice: 1. Simplify, simplify, simplify. 2. Unit tests (see https://twistedmatrix.com/documents/current/core/howto/trial.html).
On Mon, Apr 27, 2015 at 8:26 PM, Itamar Turner-Trauring <itamar@itamarst.org
wrote:
self.in_send_message seems like a potential source of bugs (can't see it being set to False on all branches here) and is also likely unnecessary.
self.in_send_message is redundant with self.waiting_id, which was added later, and will be refactored out next iteration. But, after a message removed from the queue and sent, we can't send another until either we get an ack message back, or the timeout fires. Those bits of code omitted here, are what resets self.in_send_message back to False. (Or eventually, self.waiting_id to None). Suggestions on a better way to do this are most welcome. I thought about only having check_for_send push itself back on the reactor loop if the queue is empty, and having the ack and/or timeout code fire check_for_send in other cases, instead of setting the variable. Not sure that's terribly simpler, though.
General advice:
1. Simplify, simplify, simplify.
2. Unit tests (see https://twistedmatrix.com/documents/current/core/howto/trial.html).
There actually is a test around this, although ugly, as it has to simulate the protocol and the factory's queue. It passes. This also works when we run integration tests with some 3rd party open source apps, but breaks in production at one location. #1 well taken though, I should break check_for_send up and make it easier to write simple tests.
On Mon, Apr 27, 2015 at 4:55 PM, Glyph Lefkowitz <glyph@twistedmatrix.com> wrote:
Nothing strikes me as obviously wrong about this code (except the "deferToThread" which seems *slightly* suspicious, since nothing in the example appears to have anything to do with threads, and whenever you get threads involved things get complicated).
The deferToThread just shoves the write of the message string to file into the thread pool. It was added after this issue was observed. Using deferToThread is a hangover from attaching a logging callback when the file wrote, since removed, so callInThread would work also. I didn't want to add the file write onto the reactor thread here. I wish I could find a simpler case, but frankly, I have an integration test system that uses a test app to generate upstream messages, and an open source java app to simulate the downstream server. I can't reproduce this there even with all the moving parts. Happening "in the wild" at one location, was hoping for some advice on troubleshooting what happens between calling transport.write and seeing bytes on the wire. I guess it's time to go digging into parts of twisted I've always taken for granted, and learn something new. ;-)
Okay, figured this out. Abstract of the issue: I am a dumbass. First, Itamar gets to thrash me soundly, as there was a bug in some code (not shown in my example) that is not properly tested. That code was responsible for "turning off" the protocol instance if connectionLost method was called, by doing some cleanup then redefining check_for_send in the instance as a no-op to stop it from pushing itself back on the reactor loop. Since the factory here is a reconnecting one, if the network or downstream server etc., glitches, we get a *new* connected protocol instance and an old, unconnected one. Due to the bug, both are consuming messages from the factory queue, calling the protocols' transport.write with the message data. This location has some issues, so the connection occasionally drops on the downstream end, which we don't see elsewhere. transport.write on a tcp connection looks like it just returns if the underlying fd object is closed. So messages picked up by the old object get bit-bucketed. http://twistedmatrix.com/documents/15.0.0/api/twisted.internet.tcp.Connectio... http://twistedmatrix.com/trac/browser/tags/releases/twisted-15.0.0/twisted/i... To dos: Fix bug, proper unit test, fix integration test so we test dropping connections under load... Question: I'm assuming there's a good reason transport.write is written so it doesn't error and fails silently even though its underlying connection is not connected anymore. As part of grokking the guts of this thing I've been using for a decade...I'm curious to know why. On Tue, Apr 28, 2015 at 7:43 AM, Brian Costlow <brian.costlow@gmail.com> wrote:
On Mon, Apr 27, 2015 at 4:55 PM, Glyph Lefkowitz <glyph@twistedmatrix.com> wrote:
Nothing strikes me as obviously wrong about this code (except the "deferToThread" which seems *slightly* suspicious, since nothing in the example appears to have anything to do with threads, and whenever you get threads involved things get complicated).
The deferToThread just shoves the write of the message string to file into the thread pool. It was added after this issue was observed. Using deferToThread is a hangover from attaching a logging callback when the file wrote, since removed, so callInThread would work also. I didn't want to add the file write onto the reactor thread here.
I wish I could find a simpler case, but frankly, I have an integration test system that uses a test app to generate upstream messages, and an open source java app to simulate the downstream server. I can't reproduce this there even with all the moving parts.
Happening "in the wild" at one location, was hoping for some advice on troubleshooting what happens between calling transport.write and seeing bytes on the wire. I guess it's time to go digging into parts of twisted I've always taken for granted, and learn something new. ;-)
On Apr 28, 2015, at 10:55 AM, Brian Costlow <brian.costlow@gmail.com> wrote:
Okay, figured this out.
Glad to hear it!
Abstract of the issue: I am a dumbass.
We all make mistakes. And a lot of people make this specific one :-).
First, Itamar gets to thrash me soundly,
No need for violence!
as there was a bug in some code (not shown in my example) that is not properly tested. That code was responsible for "turning off" the protocol instance if connectionLost method was called, by doing some cleanup then redefining check_for_send in the instance as a no-op to stop it from pushing itself back on the reactor loop.
Ain't it always the way. We always say "please simplify the test case" and then the asker always says "but it only happens when it's super complex". The act of simplifying the test case itself often pinpoints the problem (as it appears to have done in your case). Honestly this was a pretty quick turnaround and I appreciate the
Question: I'm assuming there's a good reason transport.write is written so it doesn't error and fails silently even though its underlying connection is not connected anymore. As part of grokking the guts of this thing I've been using for a decade...I'm curious to know why.
The main reason is "predictability". First, consider this function: def sendSomeCommand(self): self.transport.write(b"DO-SOMETHING\r\n") self.transport.write(b"DO-SOMETHING-ELSE\r\n") Simple enough, right? Sends two commands? We could easily find out between the first call to write() and the second one that the connection has dropped. Twisted optimistically invokes the send() syscall when it can (if the write buffer is empty when write() is called). This means that if the buffer happens to be empty when this method is called and if the underlying OS already knows that the connection is closed, we might be able to invoke connectionLost between the first and second write() invocation. Since you can never really know whether those invocations are happening in isolation or together, this apparently simple function needs to turn into def sendSomeCommand(self): if not self.transport.disconnected: self.transport.write(b"DO-SOMETHING\r\n") if not self.transport.disconnected: self.transport.write(b"DO-SOMETHING-ELSE\r\n") Also, if we were to aggressively report errors like this, users of Twisted might get the impression that a successful return from write() means that the bytes were actually sent. What does "actually sent" mean? This is a surprisingly complicated question with a correspondingly complicated answer. Sent to the ethernet card? To the local network? Acknowledged by the router on the other side? By the kernel of the OS on the other side? By the application container on the other side? And so on and so forth. Since you already have no idea exactly where in your stream of write() calls the other side's idea of the stream has terminated unless you have application-level acknowledgement, forcing users to handle exceptions for the case where it definitely wasn't received, while still not giving them any indication that it might not have been received, is a misleading and inconvenient API. So if you want to stop sending data to a transport that you have been notified about in connectionLost, just stop calling write() :). Hopefully this sheds some light onto transport.write's design. -glyph
participants (3)
-
Brian Costlow
-
Glyph Lefkowitz
-
Itamar Turner-Trauring