[Tutor] reading an input stream
Cameron Simpson
cs at zip.com.au
Thu Jan 7 16:52:14 EST 2016
On 07Jan2016 12:14, richard kappler <richkappler at gmail.com> wrote:
>On Thu, Jan 7, 2016 at 12:07 PM, James Chapman <james at uplinkzero.com> wrote:
>> From an architectural POV I'd have a few listener threads that upon
>> receipt would spawn (or take from a pool is a better approach) a worker
>> thread to process the received data.
As would I.
>That's the plan, if I'm understanding you correctly. We've brainstormed the
>threading, haven't written any of it yet.
The code you've posted should be fine for testing a single connection.
I'd be doing 2 things to what you posted, myself:
- use plain old .read to collect the data and assemble the XML packets
- decouple your XML parsing from the collection and packet parsing
To the first, I suspect that when you have our packets arriving rapidly you are
either dropping data because the data overflows your 8192 recv size or you're
getting multiple logical packets stuffed into a buffer:
recv #1:
\x02xml...\x03\x02partial-xml
recv #2:
tail-of-previous-xml\x03\x02more-xml...
which would definitiely get your XML parser unhappy.
Instead, gather the data progressively and emit XML chunks. You've got a TCP
stream - the TCPServer class will do an accept and handle you an _unbuffered_
binary stream file from which you can just .read(), ignoring any arbitrary
"packet" sizes. For example (totally untested) using a generator:
def xml_extractor(fp):
''' Read a continuous stream of bytes from `fp`, yield bytes to be parsed
elsewhere. An arbitrary size of 8192 bytes is used to request more data; it
doesn't say anything about any underlying network packet size.
'''
# a (buffer, offset) pair of ungathered data
buffer = b''
offset = 0
# locate start of XML chunk
while True:
if offset >= len(buffer):
buffer = fp.read1(8192)
offset = 0
if not buffer:
# EOF: exit generator
return
# examine the next byte
b = buffer[offset]
offset += 1
if b == 0x02:
# opening delimiter
break
warning("discard byte 0x%02x", b)
# gather XML chunk
chunks = []
while True:
endpos = buffer.find(b'\x03', offset)
if endpos < 0:
# no delimiter, collect entire chunk
chunks.append(buffer[offset:])
offset = len(buffer)
else:
# collect up to the delimiter
chunks.append(buffer[offset:endpos])
offset = endpos + 1
break
# keep collecting...
if offset >= len(buffer):
buffer = fp.read1(8192)
offset = 0
if not buffer:
error("EOF: incomplete final XML packet found: %r", b''.join(chunks))
return
# yield the XML bytes
yield b''.join(chunks)
chunks = None # release chunks so memory can be freed promptly
This reads bytes into a buffer and locates the 0x02...0x03 boundaries and
yields the bytes in between. Then your main stream decoder just looks like
this:
for xml_bytes in xml_extractor(fp):
# decode the bytes into a str
xml_s = xml_bytes.decode('utf-8')
... pass xml_s to your XML parser ...
All of this presumes you have a binary file-like object reading from your TCP
stream. And since we're suggesting you spawn a Thread per connection, I'm
suggesting you use the TCPServer class from the socketserver module, using its
ThreadingMixin. That gets you a threading TCP server.
Query: do the cameras connect to you, or do you connect to the cameras? I'm
presuming the former.
So the surround framework would create a TCPServer instance listening on your
ip:port, and have a handler method which is given a "request" parameter by
TCPServer. That object has a .rfile property which is a read-only binary stream
for reading from the socket, and _that_ is what we refer to as `fp` in the code
above.
Setting up the TCPServer is pretty simple. Lifting the essential bits from some
code of my own (again, untested):
from socketserver import TCPServer, ThreadingMixIn, StreamRequestHandler
class MyServer(ThreadingMixIn, TCPServer):
def __init__(self, bind_addr):
TCPServer.__init__(self, bind_addr, MyRequestHandler)
class MyRequestHandler(StreamRequestHandler):
def handle(self):
fp = self.rfile
for xml_bytes in xml_extractor(fp):
# decode the bytes into a str
xml_s = xml_bytes.decode('utf-8')
... pass xml_s to your XML parser ...
# start the server
S = MyServer( ("hostname", 9999) )
S.serve_forever()
One critical bit in the above is the use of .read1() in the xml_extractor
function: that calls the underlying stream's .read() method at most once, so
that it behaves like a UNIX read() call and may return a "short" read - less
than the maximum supplied. This is what you need to return data as soon as it
is received. By contrast, the traditional Python .read() call will try to
gather bytes until it has the amount asked for, which means that it will block.
You definitely need read1() for this kind of work.
Cheers,
Cameron Simpson <cs at zip.com.au>
More information about the Tutor
mailing list