Re: [Python-ideas] [Python-Dev] minmax() function returning (minimum, maximum) tuple of a sequence

On 10/15/2010 02:04 PM, Arnaud Delobelle wrote:
Yes There doesn't seem to be a way to generalize min/max in a way to handle all the cases without knowing the context. So in a coroutine version of Tals class, you would need to pass a hint along with the value. Ron

Am 15.10.2010 22:00, schrieb Ron Adam:
I give up. You see an issue where there is none. Georg -- Thus spake the Lord: Thou shalt indent with four spaces. No more, no less. Four shall be the number of spaces thou shalt indent, and the number of thy indenting shall be four. Eight shalt thou not indent, nor either indent thou two, excepting that thou then proceed to four. Tabs are right out.

On 10/15/2010 03:52 PM, Georg Brandl wrote:
Sorry for the delay, I was away for the day... Thanks for trying George, it really wasn't an issue. I was thinking about it from the point of view of, would it be possible to make min and max easier to use in indirect ways. As I found out, those functions depend on both the number of arguments, and the context they are used in, to do the right thing. Change either and you may get unexpected results. In the example where *args was used... I had left out the function def of min(*args, **kwds) where you would have saw that args, was just unpacking the arguments, and not the list object being passed to min. My mistake. Cheers, Ron

On 10/16/10 07:00, Ron Adam wrote:
There is a way, by using threading, and injecting a thread-safe tee into max/min/otherFuncs (over half of the code is just for implementing thread-safe tee). Using this, there is no need to make any modification to min/max. I suppose it might be possible to convert this to using the new coroutine proposal (though I haven't been following the proposal close enough). The code is quite slow (and ugly), but it can handle large generators (or infinite generators). The memory shouldn't grow if the functions in *funcs takes more or less similar amount of time (which is true in case of max and min); if *funcs need to take both very fast and very slow codes at the same time, some more code can be added for load-balancing by stalling faster threads' request for more items, until the slower threads finishes. Pros: - no modification to max/min Cons: - slow, since itertools.tee() is reimplemented in pure-python - thread is uninterruptible import threading, itertools class counting_dict(dict): """ A thread-safe dict that allows its items to be accessed max_access times, after that the item is deleted. >>> d = counting_dict(2) >>> d['a'] = 'e' >>> d['a'] 'e' >>> d['a'] 'e' >>> d['a'] Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 10, in __getitem__ KeyError: 'a' """ def __init__(self, max_access): self.max_access = max_access def __setitem__(self, key, item): super().__setitem__(key, [item, self.max_access, threading.Lock()] ) def __getitem__(self, key): val = super().__getitem__(key) item, count, lock = val with lock: val[1] -= 1 if val[1] == 0: del self[key] return item def tee(iterable, n=2): """ like itertools.tee(), but thread-safe """ def _tee(): for i in itertools.count(): try: yield cache[i] except KeyError: producer_next() yield cache[i] def produce(next): for i in itertools.count(): cache[i] = next() yield produce.lock = threading.Lock() def producer_next(): with produce.lock: next(producer); next(producer); next(producer); next(producer); cache = counting_dict(n) it = iter(iterable) producer = produce(it.__next__) return [_tee() for _ in range(n)] def parallel_reduce(iterable, *funcs): class Worker(threading.Thread): def __init__(self, source, func): super().__init__() self.source = source self.func = func def run(self): self.result = self.func(self.source) sources = tee(iterable, len(funcs)) threads = [] for func, source in zip(funcs, sources): thread = Worker(source, func) thread.setDaemon(True) threads.append(thread) for thread in threads: thread.start() # this lets Ctrl+C work, it doesn't actually terminate # currently running threads for thread in threads: while thread.isAlive(): thread.join(100) return tuple(thread.result for thread in threads) # test code import random, time parallel_reduce([4, 6, 2, 3, 5, 7, 2, 3, 7, 8, 9, 6, 2, 10], min, max) l = (random.randint(-1000000000, 1000000000) for _ in range(100000)) start = time.time(); parallel_reduce(l, min, min, max, min, max); time.time() - start

On Sun, Oct 24, 2010 at 2:59 PM, Lie Ryan <lie.1296@gmail.com> wrote:
This should not require threads. Here's a bare-bones sketch using generators: def reduce_collector(func): outcome = None while True: try: val = yield except GeneratorExit: break if outcome is None: outcome = val else: outcome = func(outcome, val) raise StopIteration(outcome) def parallel_reduce(iterable, funcs): collectors = [reduce_collector(func) for func in funcs] values = [None for _ in collectors] for i, coll in enumerate(collectors): try: next(coll) except StopIteration as err: values[i] = err.args[0] collectors[i] = None for val in iterable: for i, coll in enumerate(collectors): if coll is not None: try: coll.send(val) except StopIteration as err: values[i] = err.args[0] collectors[i] = None for i, coll in enumerate(collectors): if coll is not None: try: coll.throw(GeneratorExit) except StopIteration as err: values[i] = err.args[0] return values def main(): it = range(100) print(parallel_reduce(it, [min, max])) if __name__ == '__main__': main() -- --Guido van Rossum (python.org/~guido)

On 2010-10-25 04:37, Guido van Rossum wrote:
This should not require threads.
Here's a bare-bones sketch using generators:
If you don't care about allowing the funcs to raise StopIteration, this can actually be simplified to: def reduce_collector(func): try: outcome = yield except GeneratorExit: outcome = None else: while True: try: val = yield except GeneratorExit: break outcome = func(outcome, val) raise StopIteration(outcome) def parallel_reduce(iterable, funcs): collectors = [reduce_collector(func) for func in funcs] values = [None for _ in collectors] for coll in collectors: next(coll) for val in iterable: for coll in collectors: coll.send(val) for i, coll in enumerate(collectors): try: coll.throw(GeneratorExit) except StopIteration as err: values[i] = err.args[0] return values More interesting (to me at least) is that this is an excellent example of why I would like to see a version of PEP380 where "close" on a generator can return a value (AFAICT the version of PEP380 on http://www.python.org/dev/peps/pep-0380 is not up-to-date and does not mention this possibility, or even link to the heated discussion we had on python-ideas around march/april 2009). Assuming that "close" on a reduce_collector generator instance returns the value of the StopIteration raised by the "return" statements, we can simplify the code even further: def reduce_collector(func): try: outcome = yield except GeneratorExit: return None while True: try: val = yield except GeneratorExit: return outcome outcome = func(outcome, val) def parallel_reduce(iterable, funcs): collectors = [reduce_collector(func) for func in funcs] for coll in collectors: next(coll) for val in iterable: for coll in collectors: coll.send(val) return [coll.close() for coll in collectors] Yes, this is only saving a few lines, but I find it *much* more readable... - Jacob

Guido van Rossum wrote:
outcome = func(outcome, val)
I don't think the generator-based approach is equivalent to what Lie Ryan's threaded code does. You are calling max(a, b) 99 times while Lie calls max(items) once. Is it possible to calculate min(items) and max(items) simultaneously using generators? I don't see how... Peter

On Mon, Oct 25, 2010 at 10:10 AM, Peter Otten <__peter__@web.de> wrote:
True. Nevertheless, my point stays: you shouldn't have to use threads to do such concurrent computations over a single-use iterable. Threads too slow and since there is no I/O multiplexing they don't offer advantages.
Is it possible to calculate min(items) and max(items) simultaneously using generators? I don't see how...
No, this is why the reduce-like approach is better for such cases. Otherwise you keep trying to fit a square peg into a round hold. -- --Guido van Rossum (python.org/~guido)

On 10/26/10 05:53, Guido van Rossum wrote:
except the max(a, b) is an attempt to find square hole to fit the square peg, and the max([a]) attempt is trying to find a round peg to fit the round hole with.

Guido van Rossum wrote:
Perhaps I'm missing something, but to my mind, that's an awfully complicated solution for such a simple problem. Here's my attempt: def multi_reduce(iterable, funcs): it = iter(iterable) collectors = [next(it)]*len(funcs) for i, f in enumerate(funcs): x = next(it) collectors[i] = f(collectors[i], x) return collectors I've called it multi_reduce rather than parallel_reduce, because it doesn't execute the functions in parallel. By my testing on Python 3.1.1, multi_reduce is consistently ~30% faster than the generator based solution for lists with 1000 - 10,000,000 items. So what am I missing? What does your parallel_reduce give us that multi_reduce doesn't? -- Steven

On Tue, Oct 26, 2010 at 3:10 AM, Steven D'Aprano <steve@pearwood.info> wrote:
You're right, the point I wanted to prove was that generators are better than threads, but the code was based on emulating reduce(). The generalization that I was aiming for was that it is convenient to write a generator that does some kind of computation over a sequence of items and returns a result at the end, and then have a driver that feeds a single sequence to a bunch such generators. This is more powerful once you try to use reduce to compute e.g. the average of the numbers fed to it -- of course you can do it using a function of (state, value) but it is so much easier to write as a loop! (At least for me -- if you do nothing but write Haskell all day I'm sure it comes naturally. :-) def avg(): total = 0 count = 0 try: while True: value = yield total += value count += 1 except GeneratorExit: raise StopIteration(total / count) The essential boilerplate here is try: while True: value = yield <use value> except GeneratorExit: raise StopIteration(<compute result>) No doubt functional aficionados will snub this, but in Python, this should run much faster than the same thing written as a reduce-ready function, due to the function overhead (which wasn't a problem in the min/max example since those are built-ins). BTW This episode led me to better understand my objection against reduce() as the universal hammer: my problem with writing avg() using reduce is that the function one feeds into reduce is asymmetric -- its first argument must be some state, e.g. a tuple (total, count), and the second argument must be the next value. This is the moment that my head reliably explodes -- even though it has no problem visualizing reduce() using a *symmetric* function like +, min or max. Also note that the reduce() based solution would have to have a separate function to extract the desired result (total / count) from the state (total, count), and for multi_reduce() you would have to supply a separate list of functions for these or some other hacky approach. -- --Guido van Rossum (python.org/~guido)

On 10/26/2010 11:43 AM, Guido van Rossum wrote:
The more traditional pull or grab (rather than push receive) version is def avg(it): total = 0 count = 0 for value in it: total += value count += 1 return total/count
with corresponding boilersplate. I can see that the receiving generator version would be handy when you do not really want to package the producer into an iterator (perhaps because items are needed for other purposes also) and want to send items to the averager as they are produced, from the point of production.
Not hard: def update(pair, item): return pair[0]+1, pair[1]+item
Reduce is extremely important as concept: any function of a sequence (or collection arbitrarily ordered) can be written as a post-processed reduction. In practice, at least for Python, it is better thought of as wide-spread template pattern, such as the boilerplate above, than just as a function. This is partly because Python does not have general function expressions (and should not!) and also because Python does have high function call overhead (because of signature flexibility). -- Terry Jan Reedy

On 10/26/10 21:10, Steven D'Aprano wrote:
The parallel_reduce() is specifically designed for for functions with the signature `func([object])` (a function that takes, as argument, a list of objects). The idea is that, you can write your func() iteratively, and parallel_reduce() will somehow handle splitting work into multiple funcs, as if you tee() the iterator, but without caching the whole iterable. Maybe max and min is a bad example, as it happens to be the case that max and min have the alternative signature `func(int, int)` which makes it a better fit with the traditional reduce() approach (and as it happens to be, parallel_reduce() seems to be a bad name as well, since it's not related to the traditional reduce() in any way). And I believe you do miss something:
I don't think that my original attempt with threading is an ideal solution either, as Guido stated, it's too complicated for such a simple problem. cProfile even shows that 30% of the its time is spent waiting on acquiring locks. The ideal solution would probably require a way for a function to interrupt its own execution (when the teed iterator is exhausted, but there is still some item in iterable), let other part of code continues (the iterator feeder, and other funcs), and then resume where it was left off (which is why I think cofunction is probably the way to go, assuming I understand correctly what cofunction is). In diagram: Initially, producer creates a few funcs, and feeds them a suspendable teed-iterators: +--------+ +------------+ true iterator +---| func_1 |---| iterator_1 |--[1, ...] [1, 2, ..] | +--------+ +------------+ | | +**********+ | +--------+ +------------+ * producer *----+---| func_2 |---| iterator_2 |--[1, ...] +**********+ | +--------+ +------------+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ First, func_1 is executed, and iterator_1 produce item 1: +********+ +************+ true iterator +---* func_1 *---* iterator_1 *--[*1*, ...] [*1*, 2, ..] | +********+ +************+ | | +----------+ | +--------+ +------------+ | producer |----+---| func_2 |---| iterator_2 |--[1, ...] +----------+ | +--------+ +------------+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ then iterator_1 suspends execution, giving control back to producer: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +**********+ | +--------+ +------------+ * producer *----+---| func_2 |---| iterator_2 |--[1, ...] +**********+ | +--------+ +------------+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ Then, producer give execution to func_2: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +----------+ | +********+ +************+ | producer |----+---* func_2 *---* iterator_2 *--[*1*, ...] +----------+ | +********+ +************+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ func_2 processes item 1, then iterator_2 suspends and give control back to producer: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +**********+ | +========+ +============+ * producer *----+---| func_2 |---| iterator_2 |--[...] +**********+ | +========+ +============+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ and now it's func_3's turn: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +----------+ | +========+ +============+ | producer |----+---| func_2 |---| iterator_2 |--[...] +----------+ | +========+ +============+ | | +********+ +************+ +---* func_3 *---* iterator_3 *--[*1*, ...] +********+ +************+ func_3 processes item 1, then iterator_3 suspends and give control back to producer: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +**********+ | +========+ +============+ * producer *----+---| func_2 |---| iterator_2 |--[...] +**********+ | +========+ +============+ | | +========+ +============+ +---| func_3 |---| iterator_3 |--[...] +========+ +============+ all funcs already consumed item 1, so producer advances (next()-ed)the "true iterator", and feeds it to the teed-iterator. +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[2, ...] [*2*, 3, ..] | +========+ +============+ | | +**********+ | +========+ +============+ * producer *----+---| func_2 |---| iterator_2 |--[2, ...] +**********+ | +========+ +============+ | | +========+ +============+ +---| func_3 |---| iterator_3 |--[2, ...] +========+ +============+ then producer resumes func_1, and it processes item 2: +********+ +************+ true iterator +---* func_1 *---* iterator_1 *--[*2*, ...] [*2*, 3, ..] | +********+ +************+ | | +----------+ | +========+ +============+ | producer |----+---| func_2 |---| iterator_2 |--[2, ...] +----------+ | +========+ +============+ | | +========+ +============+ +---| func_3 |---| iterator_3 |--[2, ...] +========+ +============+ then the same thing happens to func_2 and func_3; and repeat this until the "true iterator" is exhausted. When the true iterator is exhausted, producer signals iterator_1, iterator_2, and iterator_3 so they raises StopIteration, causing func_1, func_2, and func_3 to return a result. And producer collects the result into a list and return to the result to its caller. Basically, it is a form of cooperative multithreading where iterator_XXX (instead of func_xxx) decides when to suspend the execution of func_XXX (in this particular case, when its own cache is exhausted, but there is still some item in the true iterator). The advantage is that func_1, func_2, and func_3 can be written iteratively (i.e. as func([object])), as opposed to reduce-like approach. If performance is important, iterator_xxx can feed multiple items to func_xxx before suspending. Also, it should require no locking as object sharing and suspending execution is controlled by iterator_xxx (instead of the indeterministic preemptive threading).

Am 15.10.2010 22:00, schrieb Ron Adam:
I give up. You see an issue where there is none. Georg -- Thus spake the Lord: Thou shalt indent with four spaces. No more, no less. Four shall be the number of spaces thou shalt indent, and the number of thy indenting shall be four. Eight shalt thou not indent, nor either indent thou two, excepting that thou then proceed to four. Tabs are right out.

On 10/15/2010 03:52 PM, Georg Brandl wrote:
Sorry for the delay, I was away for the day... Thanks for trying George, it really wasn't an issue. I was thinking about it from the point of view of, would it be possible to make min and max easier to use in indirect ways. As I found out, those functions depend on both the number of arguments, and the context they are used in, to do the right thing. Change either and you may get unexpected results. In the example where *args was used... I had left out the function def of min(*args, **kwds) where you would have saw that args, was just unpacking the arguments, and not the list object being passed to min. My mistake. Cheers, Ron

On 10/16/10 07:00, Ron Adam wrote:
There is a way, by using threading, and injecting a thread-safe tee into max/min/otherFuncs (over half of the code is just for implementing thread-safe tee). Using this, there is no need to make any modification to min/max. I suppose it might be possible to convert this to using the new coroutine proposal (though I haven't been following the proposal close enough). The code is quite slow (and ugly), but it can handle large generators (or infinite generators). The memory shouldn't grow if the functions in *funcs takes more or less similar amount of time (which is true in case of max and min); if *funcs need to take both very fast and very slow codes at the same time, some more code can be added for load-balancing by stalling faster threads' request for more items, until the slower threads finishes. Pros: - no modification to max/min Cons: - slow, since itertools.tee() is reimplemented in pure-python - thread is uninterruptible import threading, itertools class counting_dict(dict): """ A thread-safe dict that allows its items to be accessed max_access times, after that the item is deleted. >>> d = counting_dict(2) >>> d['a'] = 'e' >>> d['a'] 'e' >>> d['a'] 'e' >>> d['a'] Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 10, in __getitem__ KeyError: 'a' """ def __init__(self, max_access): self.max_access = max_access def __setitem__(self, key, item): super().__setitem__(key, [item, self.max_access, threading.Lock()] ) def __getitem__(self, key): val = super().__getitem__(key) item, count, lock = val with lock: val[1] -= 1 if val[1] == 0: del self[key] return item def tee(iterable, n=2): """ like itertools.tee(), but thread-safe """ def _tee(): for i in itertools.count(): try: yield cache[i] except KeyError: producer_next() yield cache[i] def produce(next): for i in itertools.count(): cache[i] = next() yield produce.lock = threading.Lock() def producer_next(): with produce.lock: next(producer); next(producer); next(producer); next(producer); cache = counting_dict(n) it = iter(iterable) producer = produce(it.__next__) return [_tee() for _ in range(n)] def parallel_reduce(iterable, *funcs): class Worker(threading.Thread): def __init__(self, source, func): super().__init__() self.source = source self.func = func def run(self): self.result = self.func(self.source) sources = tee(iterable, len(funcs)) threads = [] for func, source in zip(funcs, sources): thread = Worker(source, func) thread.setDaemon(True) threads.append(thread) for thread in threads: thread.start() # this lets Ctrl+C work, it doesn't actually terminate # currently running threads for thread in threads: while thread.isAlive(): thread.join(100) return tuple(thread.result for thread in threads) # test code import random, time parallel_reduce([4, 6, 2, 3, 5, 7, 2, 3, 7, 8, 9, 6, 2, 10], min, max) l = (random.randint(-1000000000, 1000000000) for _ in range(100000)) start = time.time(); parallel_reduce(l, min, min, max, min, max); time.time() - start

On Sun, Oct 24, 2010 at 2:59 PM, Lie Ryan <lie.1296@gmail.com> wrote:
This should not require threads. Here's a bare-bones sketch using generators: def reduce_collector(func): outcome = None while True: try: val = yield except GeneratorExit: break if outcome is None: outcome = val else: outcome = func(outcome, val) raise StopIteration(outcome) def parallel_reduce(iterable, funcs): collectors = [reduce_collector(func) for func in funcs] values = [None for _ in collectors] for i, coll in enumerate(collectors): try: next(coll) except StopIteration as err: values[i] = err.args[0] collectors[i] = None for val in iterable: for i, coll in enumerate(collectors): if coll is not None: try: coll.send(val) except StopIteration as err: values[i] = err.args[0] collectors[i] = None for i, coll in enumerate(collectors): if coll is not None: try: coll.throw(GeneratorExit) except StopIteration as err: values[i] = err.args[0] return values def main(): it = range(100) print(parallel_reduce(it, [min, max])) if __name__ == '__main__': main() -- --Guido van Rossum (python.org/~guido)

On 2010-10-25 04:37, Guido van Rossum wrote:
This should not require threads.
Here's a bare-bones sketch using generators:
If you don't care about allowing the funcs to raise StopIteration, this can actually be simplified to: def reduce_collector(func): try: outcome = yield except GeneratorExit: outcome = None else: while True: try: val = yield except GeneratorExit: break outcome = func(outcome, val) raise StopIteration(outcome) def parallel_reduce(iterable, funcs): collectors = [reduce_collector(func) for func in funcs] values = [None for _ in collectors] for coll in collectors: next(coll) for val in iterable: for coll in collectors: coll.send(val) for i, coll in enumerate(collectors): try: coll.throw(GeneratorExit) except StopIteration as err: values[i] = err.args[0] return values More interesting (to me at least) is that this is an excellent example of why I would like to see a version of PEP380 where "close" on a generator can return a value (AFAICT the version of PEP380 on http://www.python.org/dev/peps/pep-0380 is not up-to-date and does not mention this possibility, or even link to the heated discussion we had on python-ideas around march/april 2009). Assuming that "close" on a reduce_collector generator instance returns the value of the StopIteration raised by the "return" statements, we can simplify the code even further: def reduce_collector(func): try: outcome = yield except GeneratorExit: return None while True: try: val = yield except GeneratorExit: return outcome outcome = func(outcome, val) def parallel_reduce(iterable, funcs): collectors = [reduce_collector(func) for func in funcs] for coll in collectors: next(coll) for val in iterable: for coll in collectors: coll.send(val) return [coll.close() for coll in collectors] Yes, this is only saving a few lines, but I find it *much* more readable... - Jacob

Guido van Rossum wrote:
outcome = func(outcome, val)
I don't think the generator-based approach is equivalent to what Lie Ryan's threaded code does. You are calling max(a, b) 99 times while Lie calls max(items) once. Is it possible to calculate min(items) and max(items) simultaneously using generators? I don't see how... Peter

On Mon, Oct 25, 2010 at 10:10 AM, Peter Otten <__peter__@web.de> wrote:
True. Nevertheless, my point stays: you shouldn't have to use threads to do such concurrent computations over a single-use iterable. Threads too slow and since there is no I/O multiplexing they don't offer advantages.
Is it possible to calculate min(items) and max(items) simultaneously using generators? I don't see how...
No, this is why the reduce-like approach is better for such cases. Otherwise you keep trying to fit a square peg into a round hold. -- --Guido van Rossum (python.org/~guido)

On 10/26/10 05:53, Guido van Rossum wrote:
except the max(a, b) is an attempt to find square hole to fit the square peg, and the max([a]) attempt is trying to find a round peg to fit the round hole with.

Guido van Rossum wrote:
Perhaps I'm missing something, but to my mind, that's an awfully complicated solution for such a simple problem. Here's my attempt: def multi_reduce(iterable, funcs): it = iter(iterable) collectors = [next(it)]*len(funcs) for i, f in enumerate(funcs): x = next(it) collectors[i] = f(collectors[i], x) return collectors I've called it multi_reduce rather than parallel_reduce, because it doesn't execute the functions in parallel. By my testing on Python 3.1.1, multi_reduce is consistently ~30% faster than the generator based solution for lists with 1000 - 10,000,000 items. So what am I missing? What does your parallel_reduce give us that multi_reduce doesn't? -- Steven

On Tue, Oct 26, 2010 at 3:10 AM, Steven D'Aprano <steve@pearwood.info> wrote:
You're right, the point I wanted to prove was that generators are better than threads, but the code was based on emulating reduce(). The generalization that I was aiming for was that it is convenient to write a generator that does some kind of computation over a sequence of items and returns a result at the end, and then have a driver that feeds a single sequence to a bunch such generators. This is more powerful once you try to use reduce to compute e.g. the average of the numbers fed to it -- of course you can do it using a function of (state, value) but it is so much easier to write as a loop! (At least for me -- if you do nothing but write Haskell all day I'm sure it comes naturally. :-) def avg(): total = 0 count = 0 try: while True: value = yield total += value count += 1 except GeneratorExit: raise StopIteration(total / count) The essential boilerplate here is try: while True: value = yield <use value> except GeneratorExit: raise StopIteration(<compute result>) No doubt functional aficionados will snub this, but in Python, this should run much faster than the same thing written as a reduce-ready function, due to the function overhead (which wasn't a problem in the min/max example since those are built-ins). BTW This episode led me to better understand my objection against reduce() as the universal hammer: my problem with writing avg() using reduce is that the function one feeds into reduce is asymmetric -- its first argument must be some state, e.g. a tuple (total, count), and the second argument must be the next value. This is the moment that my head reliably explodes -- even though it has no problem visualizing reduce() using a *symmetric* function like +, min or max. Also note that the reduce() based solution would have to have a separate function to extract the desired result (total / count) from the state (total, count), and for multi_reduce() you would have to supply a separate list of functions for these or some other hacky approach. -- --Guido van Rossum (python.org/~guido)

On 10/26/2010 11:43 AM, Guido van Rossum wrote:
The more traditional pull or grab (rather than push receive) version is def avg(it): total = 0 count = 0 for value in it: total += value count += 1 return total/count
with corresponding boilersplate. I can see that the receiving generator version would be handy when you do not really want to package the producer into an iterator (perhaps because items are needed for other purposes also) and want to send items to the averager as they are produced, from the point of production.
Not hard: def update(pair, item): return pair[0]+1, pair[1]+item
Reduce is extremely important as concept: any function of a sequence (or collection arbitrarily ordered) can be written as a post-processed reduction. In practice, at least for Python, it is better thought of as wide-spread template pattern, such as the boilerplate above, than just as a function. This is partly because Python does not have general function expressions (and should not!) and also because Python does have high function call overhead (because of signature flexibility). -- Terry Jan Reedy

On 10/26/10 21:10, Steven D'Aprano wrote:
The parallel_reduce() is specifically designed for for functions with the signature `func([object])` (a function that takes, as argument, a list of objects). The idea is that, you can write your func() iteratively, and parallel_reduce() will somehow handle splitting work into multiple funcs, as if you tee() the iterator, but without caching the whole iterable. Maybe max and min is a bad example, as it happens to be the case that max and min have the alternative signature `func(int, int)` which makes it a better fit with the traditional reduce() approach (and as it happens to be, parallel_reduce() seems to be a bad name as well, since it's not related to the traditional reduce() in any way). And I believe you do miss something:
I don't think that my original attempt with threading is an ideal solution either, as Guido stated, it's too complicated for such a simple problem. cProfile even shows that 30% of the its time is spent waiting on acquiring locks. The ideal solution would probably require a way for a function to interrupt its own execution (when the teed iterator is exhausted, but there is still some item in iterable), let other part of code continues (the iterator feeder, and other funcs), and then resume where it was left off (which is why I think cofunction is probably the way to go, assuming I understand correctly what cofunction is). In diagram: Initially, producer creates a few funcs, and feeds them a suspendable teed-iterators: +--------+ +------------+ true iterator +---| func_1 |---| iterator_1 |--[1, ...] [1, 2, ..] | +--------+ +------------+ | | +**********+ | +--------+ +------------+ * producer *----+---| func_2 |---| iterator_2 |--[1, ...] +**********+ | +--------+ +------------+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ First, func_1 is executed, and iterator_1 produce item 1: +********+ +************+ true iterator +---* func_1 *---* iterator_1 *--[*1*, ...] [*1*, 2, ..] | +********+ +************+ | | +----------+ | +--------+ +------------+ | producer |----+---| func_2 |---| iterator_2 |--[1, ...] +----------+ | +--------+ +------------+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ then iterator_1 suspends execution, giving control back to producer: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +**********+ | +--------+ +------------+ * producer *----+---| func_2 |---| iterator_2 |--[1, ...] +**********+ | +--------+ +------------+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ Then, producer give execution to func_2: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +----------+ | +********+ +************+ | producer |----+---* func_2 *---* iterator_2 *--[*1*, ...] +----------+ | +********+ +************+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ func_2 processes item 1, then iterator_2 suspends and give control back to producer: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +**********+ | +========+ +============+ * producer *----+---| func_2 |---| iterator_2 |--[...] +**********+ | +========+ +============+ | | +--------+ +------------+ +---| func_3 |---| iterator_3 |--[1, ...] +--------+ +------------+ and now it's func_3's turn: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +----------+ | +========+ +============+ | producer |----+---| func_2 |---| iterator_2 |--[...] +----------+ | +========+ +============+ | | +********+ +************+ +---* func_3 *---* iterator_3 *--[*1*, ...] +********+ +************+ func_3 processes item 1, then iterator_3 suspends and give control back to producer: +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[...] [*1*, 2, ..] | +========+ +============+ | | +**********+ | +========+ +============+ * producer *----+---| func_2 |---| iterator_2 |--[...] +**********+ | +========+ +============+ | | +========+ +============+ +---| func_3 |---| iterator_3 |--[...] +========+ +============+ all funcs already consumed item 1, so producer advances (next()-ed)the "true iterator", and feeds it to the teed-iterator. +========+ +============+ true iterator +---| func_1 |---| iterator_1 |--[2, ...] [*2*, 3, ..] | +========+ +============+ | | +**********+ | +========+ +============+ * producer *----+---| func_2 |---| iterator_2 |--[2, ...] +**********+ | +========+ +============+ | | +========+ +============+ +---| func_3 |---| iterator_3 |--[2, ...] +========+ +============+ then producer resumes func_1, and it processes item 2: +********+ +************+ true iterator +---* func_1 *---* iterator_1 *--[*2*, ...] [*2*, 3, ..] | +********+ +************+ | | +----------+ | +========+ +============+ | producer |----+---| func_2 |---| iterator_2 |--[2, ...] +----------+ | +========+ +============+ | | +========+ +============+ +---| func_3 |---| iterator_3 |--[2, ...] +========+ +============+ then the same thing happens to func_2 and func_3; and repeat this until the "true iterator" is exhausted. When the true iterator is exhausted, producer signals iterator_1, iterator_2, and iterator_3 so they raises StopIteration, causing func_1, func_2, and func_3 to return a result. And producer collects the result into a list and return to the result to its caller. Basically, it is a form of cooperative multithreading where iterator_XXX (instead of func_xxx) decides when to suspend the execution of func_XXX (in this particular case, when its own cache is exhausted, but there is still some item in the true iterator). The advantage is that func_1, func_2, and func_3 can be written iteratively (i.e. as func([object])), as opposed to reduce-like approach. If performance is important, iterator_xxx can feed multiple items to func_xxx before suspending. Also, it should require no locking as object sharing and suspending execution is controlled by iterator_xxx (instead of the indeterministic preemptive threading).
participants (9)
-
Georg Brandl
-
Guido van Rossum
-
Jacob Holm
-
Lie Ryan
-
Peter Otten
-
Raymond Hettinger
-
Ron Adam
-
Steven D'Aprano
-
Terry Reedy