[Tutor] Buffer code review

richard kappler richkappler at gmail.com
Fri Feb 12 10:16:47 EST 2016


I have a two script test env set to simulate a controller in production.
The controller creates xml data from a camera tunnel as packages roll
through it, sends these xml messages over tcp to a different machine.

To simulate this in our test environment, I take a log of all the xml
messages, read them out line by line with script1, which writes them to
another log with a time.sleep(0.1) between each line being read to simulate
the rate at which packages pass through the camera tunnel.

Script2 checks file size to see if file has new data, if so, reads the
data, adds an stx and etx, then sends it out over tcp to a machine with a
splunk instance for data collection and analysis.

I will append the script below, but here is the problem. I included a
buffer because there are times when Script2 is reading while Script1 is
writing, so Script2 may not get a full line/message as detected by matching
open/close tags. In that case, the partial line is put in the buffer until
the next read, of which the first line is appended to the buffer and, if
there are matching open/close tags, sent on and the buffer cleared.

Most (literally over 99%) of the time this works fine, but on rare
occasions something goes wrong and I lose a message. The only messages that
are lost are ones that have gone through the buffer. It's rare (26 of 907
through buffer failed, of 17,000+ total messages), but it's still a data
loss so I need to figure out what it is I'm doing not so well here.

code:
##################################################################
#!/usr/bin/env python

import socket
import time
from time import sleep
import logging

logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s',
filename='streamer.log', level=logging.DEBUG)

rd1 = 'xmldata.log'
f1 = open(rd1, 'r')
#find initial end of file
f1.seek(0,2)
eof = f1.tell()
logging.info('eof = ' + str(eof))
f1.close()

f2 = open('streamerBufOUT.log', 'a')
f3 = open('bufOUT.log', 'a')

############# open socket ################
# transmit socket
socktx = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socktx_address = ('127.0.0.1', 2008)
logging.info('opening socktx on %s port %s' % socktx_address)
socktx.connect(socktx_address)
##########################################

#to read from beginning of file when starting script, uncomment
#eof = 0

i = 0
buf = ''
brokenLine = 0

def sendLine(line, i):
    s =  '\x02' + line[:-1] + '\x03'
    logging.info('sending line ' + str(i))
    print i
    socktx.send(s)  # send the entire line to the socket
    f2.write(s + '\n')
#    time.sleep(0.2)
    return

while True:
  with open(rd1, 'r') as f1:
     try:
        #check file size to see if grown, set neweof
        f1.seek(0,2)
        neweof = f1.tell()
        logging.info('check file size, set neweof at ' + str(neweof))
     except ValueError:
        f1 = open(rd1, 'r')
        f1.seek(0,2)
        neweof = f1.tell()
        logging.info('value error, neweof is ' + str(neweof))
     #if the file is larger...
     if neweof > eof:
        #go back to last position...
        f1.seek(eof)
        logging.info('neweof > eof, file has new lines, parsing')
# read new line in log.txt
        for line in f1:
            i += 1
            logging.debug('entering for line loop')
            #if a partial line is in the buffer
            if brokenLine == 1:
                i -= 1
                logging.debug('adding rest of line to buf')
                buf += line
                f3.write(buf + '\n')
                sendLine(buf, i)
                buf = ''
                brokenLine = 0
            #otherwise see if it's a full line
            elif '<objectdata ' in line:
                if '</objectdata>' in line:
                    logging.info('sending objectdata line ' + str(i))
                    brokenLine = 0
                    sendLine(line, i)
                else:
                    brokenLine = 1
                    buf += line
                    logging.info('broken line sending to buf line ' +
str(i))
                    f3.write(buf + '\n')
            elif '<objectinfo>' in line:
                if '</objectinfo>' in line:
                    logging.info('sending objectinfo line ' + str(i))
                    brokenLine = 0
                    sendLine(line, i)
                else:
                    brokenLine = 1
                    buf += line
                    logging.info('broken line sending to buf line ' +
str(i))
                    f3.write(buf + '\n')
            elif '<tracedata ' in line:
                if '</tracedata>' in line:
                    logging.info('sending tracedata line ' + str(i))
                    brokenLine = 0
                    sendLine(line, i)
                else:
                    brokenLine = 1
                    buf += line
                    logging.info('broken line sending to buf line ' +
str(i))
                    f3.write(buf + '\n')
            elif '<heartbeatdata ' in line:
                if '</heartbeatdata>' in line:
                    logging.info('sending heartbeatdata line ' + str(i))
                    brokenLine = 0
                    sendLine(line, i)
                else:
                    brokenLine = 1
                    buf += line
                    logging.info('broken line sending to buf line ' +
str(i))
                    f3.write(buf + '\n')
            else:
                logging.error('tag match fail line ' + str(i))
                buf = ''

        # update log.txt file size
        eof = neweof
        logging.info('eof set to neweof, sleeping 2, eof =' + str(eof))
        time.sleep(2)

     elif neweof < eof:
        # this resets eof at night when old log file zipped and new log
file started
        eof = 0
        logging.info('neweof < eof, set eof = 0, sleeping 2')
        time.sleep(2)

     elif neweof == eof:
        # file hasn't changed, do nothing
        logging.info('neweof = eof, file has not changed, sleeping 2')
        time.sleep(2)
#####################################################################

regards, Richard

-- 

*Java is like Alzheimers; it starts slow and eventually, it takes away all
of your memory.*


More information about the Tutor mailing list