[Tutor] looping generator
Martin A. Brown
martin at linux-ip.net
Thu Jan 7 13:15:00 EST 2016
Hi there Richard,
>I have a stream of incoming xml data. I can receive the data, parse
>the data, etc, so long as I don't get fancy and I have a miniscule
>delay in between each message. If I get rid of the time delay,
>which I need to, I need the script to continuously process the
>incoming messages. Here's what I have:
To begin, I have a suggestion that is not specifically a Python
suggestion. I have read several of your prior emails describing
your problem.
If I were faced with the problem of receiving and processing data
from remote "dumb" nodes, I would separate the software components
into, at least, two distinct pieces.
1. Trust the filesystem. Write one software component that
receives the data from the wire and writes it out to a
configurable directory. If, for whatever reason, you lose the
data, the performance of the rest of the system does not
matter. Thus, capturing the data is the most important first
step. Use your system's daemonization tools to run this
service.
2. Improve performance of the parsing and processing tools.
Teach the tools how to read the data stored in the filesystem
and iteratively locate hot spots, performance issues, parsing
problems or data shortcomings.
Here are a few disorganized thoughts about why and how to do it this
way:
* the network listener becomes much simpler since it will not
parse, and will only write out to disk
* let's assume each XML chunk is about 128k and you have 30 data
sources and are receiving 4 chunks per second from each; total
data volume is 1.5MiB, easily able to be received and written to
disk on modern hardware
* you could segregate the XML chunks also by data source (and
maybe also time), writing out each chunk into the filesytem; if
you break each message into its own file, that would be a large
number of files (with attendant open() and close() costs), so
perhaps writing out a new file every minute or fifteen minutes;
here's a possible file naming scheme
received/2016/01/0000-10.143.17.227.data
received/2016/01/0100-10.143.17.227.data
received/2016/01/0200-10.143.17.227.data
...
received/2016/01/2300-10.143.17.227.data
that would leave you with about 720 files per daily directory,
something that is eminently manageable for modern filesystems
(and for any pesky humans who happen to be wandering around)
* if you write out the stream of data to the filesystem, your
network listener need only locate the \x02 byte and the \x03
byte--it could ensure that every file it wrote contained a first
byte of \x02 and a final byte of \x03
* you can independently upgrade the parsing and processing tools
and the data recording service
* if you retain these files, you can "replay" the past (errors,
bursts, reprocessing); alternatively simply delete the files
after they are processed for downstream consumers
* separating the responsibilities of each software component also
simplifies your diagnosis and software authorship process; first
you can make sure that you are recording the data properly; once
that is done, you can start to process your data, moving along
to performance questions next
Now, below, I have a few Python-specific points or questions:
>#!/usr/bin/env python
>
>import socket
>import lxml.etree as ET
>
>def dataRecv(connection):
> print 'receiving'
> while True:
> data = connection.recv(65536)
> while True:
> print "writing to data.in"
> f2.write(data)
> start = data.find('\x02')
> end = data.find('\x03')
> message = data[start+1:end]
> print "writing to messages.out"
> f3.write(message)
> yield message
You do not define f2 and f3 until below. If you are going to do
this, pass them into the function f2 and f3. I.e.
def dataRecv(connection, f2, f3):
....
while True:
# wait for a connection
connection, client_address = sock.accept()
q = dataRecv(connection, f2, f3)
>def dataParse(message):
> print 'parsing'
> xslt = ET.parse('stack13.xsl')
> dom = ET.XML(message)
> transform = ET.XSLT(xslt)
> newdom = transform(dom)
> f1.write(str(newdom))
>
>
>sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>sock_addr = ('', 2008)
>#data = sock.makefile('r')
>sock.bind(sock_addr)
>sock.listen(5)
>print 'listening'
>
>f1 = open('parser.out', 'a')
>print "opening parser.out"
>f2 = open('data.in', 'a')
>print "opening data.in"
>f3 = open('messages.out', 'a')
>print "opening messages.out"
>
>while True:
> # wait for a connection
> connection, client_address = sock.accept()
> q = dataRecv(connection)
> dataParse(q.next())
>
># close sockrx
>#connection.close()
>
>f1.close()
>
>
By the way, keep on breaking these things into functions! This is
the way to go.
>In the dataRecv function, I have tried (where you see while True)
>if data, while data and while True. Regardless, it doesn't loop, it
>receives al ten messages from the test file being sent, but only
>processes the first message then stops (not exits). I feel like I'm
>missing something obvious but can't find it.
The problem starts in your dataRecv function:
def dataRecv(connection):
print 'receiving'
while True:
data = connection.recv(65536) # -- A: receive all data
while True:
print "writing to data.in"
f2.write(data)
start = data.find('\x02')
end = data.find('\x03')
message = data[start+1:end]
print "writing to messages.out"
f3.write(message)
yield message # -- B: yield one message
while True:
# wait for a connection
connection, client_address = sock.accept() # -- D: wait on network
q = dataRecv(connection)
dataParse(q.next()) # -- C: process one message
Consider what happens when you read 64k bytes into the variable
called 'data'.
A: This probably reads all of the data at once (all ten messages).
You then locate the first \x02 and then the following \x03. (N.B.
You are also assuming that they will occur in that order in your
data; they might not.)
B: Then you are happy you have identified the first message.
You yield it, which is now handled by the dataParse function.
C: Now, you take that message and parse it.
D: And, we go back to sock.accept(), leaving all of that unprocessed data
in the variable 'data' in the dataRecv function.
Specifically, your problem is about breaking the data apart and
using it all. You might benefit from studying techniques for
breaking a text apart by paragraph. Think about how this applies to
your problem:
http://code.activestate.com/recipes/66063-read-a-text-file-by-paragraph/#c1
N.B. The code example may not be utterly perfect, but it is
precisely the same problem that you are having.
Good luck and enjoy,
-Martin
--
Martin A. Brown
http://linux-ip.net/
More information about the Tutor
mailing list