[Tutor] reading an input stream

Cameron Simpson cs at zip.com.au
Thu Jan 7 16:52:14 EST 2016


On 07Jan2016 12:14, richard kappler <richkappler at gmail.com> wrote:
>On Thu, Jan 7, 2016 at 12:07 PM, James Chapman <james at uplinkzero.com> wrote:
>> From an architectural POV I'd have a few listener threads that upon
>> receipt would spawn (or take from a pool is a better approach) a worker
>> thread to process the received data.

As would I.

>That's the plan, if I'm understanding you correctly. We've brainstormed the
>threading, haven't written any of it yet.

The code you've posted should be fine for testing a single connection.

I'd be doing 2 things to what you posted, myself:

  - use plain old .read to collect the data and assemble the XML packets

  - decouple your XML parsing from the collection and packet parsing

To the first, I suspect that when you have our packets arriving rapidly you are 
either dropping data because the data overflows your 8192 recv size or you're 
getting multiple logical packets stuffed into a buffer:

  recv #1:
    \x02xml...\x03\x02partial-xml

  recv #2:
    tail-of-previous-xml\x03\x02more-xml...

which would definitiely get your XML parser unhappy.

Instead, gather the data progressively and emit XML chunks. You've got a TCP 
stream - the TCPServer class will do an accept and handle you an _unbuffered_ 
binary stream file from which you can just .read(), ignoring any arbitrary 
"packet" sizes.  For example (totally untested) using a generator:

  def xml_extractor(fp):
    ''' Read a continuous stream of bytes from `fp`, yield bytes to be parsed 
    elsewhere. An arbitrary size of 8192 bytes is used to request more data; it 
    doesn't say anything about any underlying network packet size.
    '''
    # a (buffer, offset) pair of ungathered data
    buffer = b''
    offset = 0
    # locate start of XML chunk
    while True:
      if offset >= len(buffer):
        buffer = fp.read1(8192)
        offset = 0
        if not buffer:
          # EOF: exit generator
          return
      # examine the next byte
      b = buffer[offset]
      offset += 1
      if b == 0x02:
        # opening delimiter
        break
      warning("discard byte 0x%02x", b)
    # gather XML chunk
    chunks = []
    while True:
      endpos = buffer.find(b'\x03', offset)
      if endpos < 0:
        # no delimiter, collect entire chunk
        chunks.append(buffer[offset:])
        offset = len(buffer)
      else:
        # collect up to the delimiter
        chunks.append(buffer[offset:endpos])
        offset = endpos + 1
        break
      # keep collecting...
      if offset >= len(buffer):
        buffer = fp.read1(8192)
        offset = 0
        if not buffer:
          error("EOF: incomplete final XML packet found: %r", b''.join(chunks))
          return
    # yield the XML bytes
    yield b''.join(chunks)
    chunks = None   # release chunks so memory can be freed promptly

This reads bytes into a buffer and locates the 0x02...0x03 boundaries and 
yields the bytes in between. Then your main stream decoder just looks like 
this:

  for xml_bytes in xml_extractor(fp):
    # decode the bytes into a str
    xml_s = xml_bytes.decode('utf-8')
    ... pass xml_s to your XML parser ...

All of this presumes you have a binary file-like object reading from your TCP 
stream. And since we're suggesting you spawn a Thread per connection, I'm 
suggesting you use the TCPServer class from the socketserver module, using its 
ThreadingMixin. That gets you a threading TCP server.

Query: do the cameras connect to you, or do you connect to the cameras? I'm 
presuming the former.

So the surround framework would create a TCPServer instance listening on your 
ip:port, and have a handler method which is given a "request" parameter by 
TCPServer. That object has a .rfile property which is a read-only binary stream 
for reading from the socket, and _that_ is what we refer to as `fp` in the code 
above.

Setting up the TCPServer is pretty simple. Lifting the essential bits from some 
code of my own (again, untested):

  from socketserver import TCPServer, ThreadingMixIn, StreamRequestHandler

  class MyServer(ThreadingMixIn, TCPServer):
    def __init__(self, bind_addr):
      TCPServer.__init__(self, bind_addr, MyRequestHandler)

  class MyRequestHandler(StreamRequestHandler):
    def handle(self):
      fp = self.rfile
      for xml_bytes in xml_extractor(fp):
        # decode the bytes into a str
        xml_s = xml_bytes.decode('utf-8')
        ... pass xml_s to your XML parser ...

  # start the server
  S = MyServer( ("hostname", 9999) )
  S.serve_forever()

One critical bit in the above is the use of .read1() in the xml_extractor 
function: that calls the underlying stream's .read() method at most once, so 
that it behaves like a UNIX read() call and may return a "short" read - less 
than the maximum supplied. This is what you need to return data as soon as it 
is received. By contrast, the traditional Python .read() call will try to 
gather bytes until it has the amount asked for, which means that it will block.  
You definitely need read1() for this kind of work.

Cheers,
Cameron Simpson <cs at zip.com.au>


More information about the Tutor mailing list