[Python-Dev] Re: "groupby" iterator

David Eppstein eppstein at ics.uci.edu
Sat Nov 29 15:14:14 EST 2003


In article <000101c3b63f$c7fc4720$e841fea9 at oemcomputer>,
 "Raymond Hettinger" <python at rcn.com> wrote:

> Here is an implementation that translates readily into C.  It uses
> Guido's syntax and meets my requirement that bad things don't happen
> when someone runs the outer iterator independently of the inner
> iterator.

If I understand your code correctly, running the outer iterator skips 
over any uniterated values from the inner iterator.  I'd be happier with 
behavior like tee: the inner groups always return the same sequences of 
items, whether or not the inner iteration happens before the next outer 
iteration, but the memory cost is only small if you iterate through them 
in the expected order.  E.g., see the "out of order" unit test in the 
code below.

def identity(x): return x

def groupby(iterable,key=identity):
    it = iter(iterable)
    first = it.next()
    while 1:
        group = bygroup(it,first,key)
        yield key(first),group
        first = group.nextgroup()

class bygroup:
    """Iterator of items in a single group."""
    
    def __init__(self, iterable, first, key=identity):
        """Instance variables:
            - self.lookahead: reversed list of items still to be output
            - self.groupid: group identity
            - self.key: func to turn iterated items into group ids
            - self.it: iterator, or None once we reach another group
            - self.postfinal: None (only valid once self.it is None)
        """
        self.key = key
        self.it = iter(iterable)
        self.lookahead = [first]
        self.groupid = self.key(first)

    def __iter__(self):
        return self

    def group(self):
        return self.groupid

    def next(self):
        if self.lookahead:
            return self.lookahead.pop()
        if self.it is None:
            raise StopIteration
        x = self.it.next()
        if self.key(x) == self.groupid:
            return x
        self.postfinal = x
        self.it = None
        raise StopIteration

    def nextgroup(self):
        """Return first item of next group.
        Raises StopIteration if there is no next group."""
        if self.it is not None:
            L = list(self)
            L.reverse()
            self.lookahead = L
        if self.it is not None:
            raise StopIteration
        return self.postfinal

import unittest
from sets import Set as set

class TestBasicOps(unittest.TestCase):

    def test_groupby(self):
        # Check zero length input
        self.assertEqual([], list(groupby([],lambda r:r[0])))

        # Check normal input
        s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22),
             (2,15,22), (3,16,23), (3,17,23)]
        dup = []
        for k, g in groupby(s, lambda r:r[0]):
            for elem in g:
                self.assertEqual(k, elem[0])
                dup.append(elem)
        self.assertEqual(s, dup)
        
        # Check case where groups are iterated out of order
        nest1 = []
        for k,g in groupby(s, lambda r:r[0]):
            nest1.append(list(g))
        nest2 = []
        for k,g in groupby(s, lambda r:r[0]):
            nest2.append(g)
        nest2 = [list(g) for g in nest2]
        self.assertEqual(nest1,nest2)

        # Check nested case
        dup = []
        for k, g in groupby(s, lambda r:r[0]):
            for ik, ig in groupby(g, lambda r:r[2]):      
                for elem in ig:
                    self.assertEqual(k, elem[0])
                    self.assertEqual(ik, elem[2])               
                    dup.append(elem)
        self.assertEqual(s, dup)      

        # Check case where inner iterator is not used
        keys = []
        for k, g in groupby(s, lambda r:r[0]):
            keys.append(k)
        expectedkeys = set([r[0] for r in s])
        self.assertEqual(set(keys), expectedkeys)
        self.assertEqual(len(keys), len(expectedkeys))

suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestBasicOps))
unittest.TextTestRunner(verbosity=2).run(suite)

-- 
David Eppstein                      http://www.ics.uci.edu/~eppstein/
Univ. of California, Irvine, School of Information & Computer Science




More information about the Python-Dev mailing list