[Tutor] reading an input stream

Danny Yoo dyoo at hashcollision.org
Thu Dec 24 16:54:40 EST 2015


> I think what I need to do would be analogous to (pardon if I'm using the
> wrong terminology, at this poing in the discussion I am officially out of
> my depth) sending the input stream to a buffer(s) until  the ETX for that
> message comes in, shoot the buffer contents to the parser while accepting
> the next STX + message fragment into the buffer, or something analogous.

Yes, I agree.  It sounds like you have one process read the socket and
collect chunks of bytes delimited by the STX markers.  It can then
send those chunks to the XML parser.


We can imagine one process that reads the socket and spits out a list
of byte chunks:

    chunks = readDelimitedChunks(socket)

and another process that parses those chunks and does something with them:

    for chunk in chunks:
        ....


It would be nice if we could organize the program like this.  But one
problem is that chunks might not be finite!  The socket might keep on
returning bytes.  If it keeps returning bytes, we can't possibly
return a finite list of the chunked bytes.


What we really want is something like:

    chunkStream = readDelimitedChunks(socket)
    for chunk in chunkStream:
        ....

where chunkStream is itself like a socket: it should be something that
we can repeatedly read from as if it were potentially infinite.


We can actually do this, and it isn't too bad.  There's a mechanism in
Python called a generator that allows us to write function-like things
that consume streams of input and produce streams of output.  Here's a
brief introduction to them.

For example, here's a generator that knows how to produce an infinite
stream of numbers:

##############
def nums():
    n = 0
    while True:
        yield n
        n += 1
##############

What distinguishes a generator from a regular function?  The use of
"yield".  A "yield" is like a return, but rather than completely
escape out of the function with the return value, this generator will
remember what it was doing  at that time.  Why?  Because it can
*resume* itself when we try to get another value out of the generator.

Let's try it out:

#####################

>>> numStream = nums()
>>> numStream.next()
0
>>> numStream.next()
1
>>> numStream.next()
2
>>> numStream.next()
3
>>> numStream.next()
4
#####################

Every next() we call on a generator will restart it from where it left
off, until it reaches its next "yield".  That's how we get this
generator to return an infinite sequence of things.


That's how we produce infinite sequences.  And we can write another
generator that knows how to take a stream of numbers, and square each
one.

########################
def squaring(stream):
    for n in stream:
        yield n
########################


Let's try it.


########################

>>> numStream = nums()
>>> squaredNums = squaring(numStream)
>>> squaredNums.next()
0
>>> squaredNums.next()
1
>>> squaredNums.next()
4
>>> squaredNums.next()
9
>>> squaredNums.next()
16
########################


If you have experience with other programming languages, you may have
heard of the term "co-routine".  What we're doing with this should be
reminiscent of coroutine-style programming.  We have one generator
feeding input into the other, with program control bouncing back and
forth between the generators as necessary.


So that's a basic idea of generators.  It lets us write processes that
can deal with and produce streams of data.  In the context of sockets,
this is particularly helpful, because sockets can be considered a
stream of bytes.


Here's another toy example that's closer to the problem you're trying
to solve.  Let's say that we're working on a program to alphabetize
the words of a sentence.  Very useless, of course.  :P  We might pass
it in the input:

    this
    is
    a
    test
    of
    the
    emergency
    broadcast
    system

and expect to get back the following sentence:

     hist
     is
     a
     estt
     fo
     eht
     ceeegmnry
     aabcdorst
     emssty

We can imagine one process doing chunking, going from a sequence of
characters to a sequence of words:

###########################################
def extract_words(seq):
    """Yield the words in a sequence of characters."""
    buffer = []
    for ch in seq:
        if ch.isalpha():
            buffer.append(ch)
        elif buffer:
            yield ''.join(buffer)
            del buffer[:]
    # If we hit the end of the buffer, we still might
    # need to yield one more result.
    if buffer:
        yield ''.join(buffer)
###########################################


and a function that transforms words to their munged counterpart:

#########################
def transform(word):
    """"Munges a word into its alphabetized form."""
    chars = list(word)
    chars.sort()
    return ''.join(chars)
#########################

This forms the major components of a program that can do the munging
on a file... or a socket!


Here's the complete example:


#############################################
import sys

def extract_words(seq):
    """Yield the words in a sequence of characters."""
    buffer = []
    for ch in seq:
        if ch.isalpha():
            buffer.append(ch)
        elif buffer:
            yield ''.join(buffer)
            del buffer[:]
    # If we hit the end of the buffer, we still might
    # need to yield one more result.
    if buffer:
        yield ''.join(buffer)

def transform(word):
    """"Munges a word into its alphabetized form."""
   chars = list(word)
    chars.sort()
    return ''.join(chars)


def as_byte_seq(f):
    """Return the bytes of the file-like object f as a
    sequence."""
    while True:
        ch = f.read(1)
        if not ch: break
        yield ch


if __name__ == '__main__':
    for word in extract_words(as_byte_seq(sys.stdin)):
        print(transform(word))
############################################



If you have questions, please feel free to ask.  Good luck!


More information about the Tutor mailing list