[pypy-svn] r77066 - in pypy/branch/fast-forward/pypy/module/itertools: . test
afa at codespeak.net
afa at codespeak.net
Tue Sep 14 16:16:51 CEST 2010
Author: afa
Date: Tue Sep 14 16:16:50 2010
New Revision: 77066
Modified:
pypy/branch/fast-forward/pypy/module/itertools/__init__.py
pypy/branch/fast-forward/pypy/module/itertools/interp_itertools.py
pypy/branch/fast-forward/pypy/module/itertools/test/test_itertools.py
Log:
Implement itertools.izip_longest()
Modified: pypy/branch/fast-forward/pypy/module/itertools/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/itertools/__init__.py (original)
+++ pypy/branch/fast-forward/pypy/module/itertools/__init__.py Tue Sep 14 16:16:50 2010
@@ -35,6 +35,7 @@
'imap' : 'interp_itertools.W_IMap',
'islice' : 'interp_itertools.W_ISlice',
'izip' : 'interp_itertools.W_IZip',
+ 'izip_longest' : 'interp_itertools.W_IZipLongest',
'repeat' : 'interp_itertools.W_Repeat',
'starmap' : 'interp_itertools.W_StarMap',
'takewhile' : 'interp_itertools.W_TakeWhile',
Modified: pypy/branch/fast-forward/pypy/module/itertools/interp_itertools.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/itertools/interp_itertools.py (original)
+++ pypy/branch/fast-forward/pypy/module/itertools/interp_itertools.py Tue Sep 14 16:16:50 2010
@@ -1,7 +1,8 @@
from pypy.interpreter.baseobjspace import Wrappable
from pypy.interpreter.error import OperationError
from pypy.interpreter.typedef import TypeDef, make_weakref_descr
-from pypy.interpreter.gateway import interp2app, ObjSpace, W_Root
+from pypy.interpreter.gateway import interp2app, ObjSpace, W_Root, unwrap_spec
+from pypy.interpreter.argument import Arguments
from pypy.rlib.rarithmetic import ovfcheck
class W_Count(Wrappable):
@@ -545,6 +546,67 @@
""")
+class W_IZipLongest(W_IMap):
+ _error_name = "izip_longest"
+
+ def next_w(self):
+ space = self.space
+ nb = len(self.iterators_w)
+
+ if nb == 0:
+ raise OperationError(space.w_StopIteration, space.w_None)
+
+ objects_w = [None] * nb
+ for index in range(nb):
+ w_value = self.w_fillvalue
+ w_it = self.iterators_w[index]
+ if w_it is not None:
+ try:
+ w_value = space.next(w_it)
+ except OperationError, e:
+ if not e.match(space, space.w_StopIteration):
+ raise
+
+ self.active -= 1
+ if self.active == 0:
+ # It was the last active iterator
+ raise
+ self.iterators_w[index] = None
+
+ objects_w[index] = w_value
+ return space.newtuple(objects_w)
+
+ at unwrap_spec(ObjSpace, W_Root, Arguments)
+def W_IZipLongest___new__(space, w_subtype, __args__):
+ kwds = __args__.keywords
+ w_fillvalue = space.w_None
+ if kwds:
+ if kwds[0] == "fillvalue" and len(kwds) == 1:
+ w_fillvalue = __args__.keywords_w[0]
+ else:
+ raise OperationError(space.w_TypeError, space.wrap(
+ "izip_longest() got unexpected keyword argument"))
+
+ self = W_IZipLongest(space, space.w_None, __args__.arguments_w)
+ self.w_fillvalue = w_fillvalue
+ self.active = len(self.iterators_w)
+
+ return space.wrap(self)
+
+W_IZipLongest.typedef = TypeDef(
+ 'izip_longest',
+ __new__ = interp2app(W_IZipLongest___new__),
+ __iter__ = interp2app(W_IZipLongest.iter_w, unwrap_spec=['self']),
+ next = interp2app(W_IZipLongest.next_w, unwrap_spec=['self']),
+ __doc__ = """Return an izip_longest object whose .next() method returns a tuple where
+ the i-th element comes from the i-th iterable argument. The .next()
+ method continues until the longest iterable in the argument sequence
+ is exhausted and then it raises StopIteration. When the shorter iterables
+ are exhausted, the fillvalue is substituted in their place. The fillvalue
+ defaults to None or can be specified by a keyword argument.
+ """)
+
+
class W_Cycle(Wrappable):
def __init__(self, space, w_iterable):
Modified: pypy/branch/fast-forward/pypy/module/itertools/test/test_itertools.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/itertools/test/test_itertools.py (original)
+++ pypy/branch/fast-forward/pypy/module/itertools/test/test_itertools.py Tue Sep 14 16:16:50 2010
@@ -626,3 +626,85 @@
a, b = itertools.tee(iter('abc'))
ref = weakref.ref(b)
assert ref() is b
+
+ def test_iziplongest(self):
+ from itertools import izip_longest, islice, count
+ for args in [
+ ['abc', range(6)],
+ [range(6), 'abc'],
+ [range(100), range(200,210), range(300,305)],
+ [range(100), range(0), range(300,305), range(120), range(150)],
+ [range(100), range(0), range(300,305), range(120), range(0)],
+ ]:
+ # target = map(None, *args) <- this raises a py3k warning
+ # this is the replacement:
+ target = [tuple([arg[i] if i < len(arg) else None for arg in args])
+ for i in range(max(map(len, args)))]
+ assert list(izip_longest(*args)) == target
+ assert list(izip_longest(*args, **{})) == target
+
+ # Replace None fills with 'X'
+ target = [tuple((e is None and 'X' or e) for e in t) for t in target]
+ assert list(izip_longest(*args, **dict(fillvalue='X'))) == target
+
+ # take 3 from infinite input
+ assert (list(islice(izip_longest('abcdef', count()),3)) ==
+ zip('abcdef', range(3)))
+
+ assert list(izip_longest()) == zip()
+ assert list(izip_longest([])) == zip([])
+ assert list(izip_longest('abcdef')) == zip('abcdef')
+
+ assert (list(izip_longest('abc', 'defg', **{})) ==
+ zip(list('abc') + [None], 'defg')) # empty keyword dict
+ raises(TypeError, izip_longest, 3)
+ raises(TypeError, izip_longest, range(3), 3)
+
+ for stmt in [
+ "izip_longest('abc', fv=1)",
+ "izip_longest('abc', fillvalue=1, bogus_keyword=None)",
+ ]:
+ try:
+ eval(stmt, globals(), locals())
+ except TypeError:
+ pass
+ else:
+ self.fail('Did not raise Type in: ' + stmt)
+
+ def test_izip_longest2(self):
+ import itertools
+ class Repeater(object):
+ # this class is similar to itertools.repeat
+ def __init__(self, o, t, e):
+ self.o = o
+ self.t = int(t)
+ self.e = e
+ def __iter__(self): # its iterator is itself
+ return self
+ def next(self):
+ if self.t > 0:
+ self.t -= 1
+ return self.o
+ else:
+ raise self.e
+
+ # Formerly this code in would fail in debug mode
+ # with Undetected Error and Stop Iteration
+ r1 = Repeater(1, 3, StopIteration)
+ r2 = Repeater(2, 4, StopIteration)
+ def run(r1, r2):
+ result = []
+ for i, j in itertools.izip_longest(r1, r2, fillvalue=0):
+ result.append((i, j))
+ return result
+ assert run(r1, r2) == [(1,2), (1,2), (1,2), (0,2)]
+
+ # Formerly, the RuntimeError would be lost
+ # and StopIteration would stop as expected
+ r1 = Repeater(1, 3, RuntimeError)
+ r2 = Repeater(2, 4, StopIteration)
+ it = itertools.izip_longest(r1, r2, fillvalue=0)
+ assert it.next() == (1, 2)
+ assert it.next() == (1, 2)
+ assert it.next()== (1, 2)
+ raises(RuntimeError, it.next)
More information about the Pypy-commit
mailing list