
On 24 June 2015 at 10:00, Adam Bartoš <drekin@gmail.com> wrote:
I had a generator producing pairs of values and wanted to feed all the first members of the pairs to one consumer and all the second members to another consumer. For example:
def pairs(): for i in range(4): yield (i, i ** 2)
biconsumer(sum, list)(pairs()) -> (6, [0, 1, 4, 9])
The point is I wanted the consumers to be suspended and resumed in a coordinated manner: The first producer is invoked, it wants the first element. The coordinator implemented by biconsumer function invokes pairs(), gets the first pair and yields its first member to the first consumer. Then it wants the next element, but now it's the second consumer's turn, so the first consumer is suspended and the second consumer is invoked and fed with the second member of the first pair. Then the second producer wants the next element, but it's the first consumer's turn… and so on. In the end, when the stream of pairs is exhausted, StopIteration is thrown to both consumers and their results are combined.
Unfortunately this is not possible with generators or with coroutines. Remember that the async coroutine stuff doesn't actually add any fundamental new capability to the language. It's really just a cleaner syntax for a particular way of using generators. Anything you can do with coroutines is also possible with generators (hence 3.4's asyncio does all its stuff with ordinary generators). The problem is fundamental: iterable consumers like sum, list etc drive the flow control of execution. You can suspend them by feeding in a generator but they aren't really suspended the way a generator is because the consumer remains at the base of the call stack. If you can rewrite the consumers though it is possible to rewrite them in a fairly simple way using generators so that you can push values in suspending after each push. Suppose I have a consumer function that looks like: def func(iterable): <init> for x in iterable: <block> return <expr> I can rewrite it as a feed-in generator like so: def gfunc(): <init> yield lambda: <expr> while True: x = yield <block> When I call this function I get a generator. I can call next on that generator to get a result function. I can then push values in with the send method. When I'm done pushing values in I can call the result function to get the final result. Example:
def gsum(): ... total = 0 ... yield lambda: total ... while True: ... x = yield ... total += x ... summer = gsum() result = next(summer) next(summer) # Advance to next yield summer.send(1) summer.send(2) summer.send(3) result() 6
You can make a decorator to handle the awkwardness of calling the generator and next-ing it. Also you can use the decorator to provide a consumer function with the inverted consumer behaviour as an attribute: import functools def inverted_consumer(func): @functools.wraps(func) def consumer(iterable): push, result = inverted() for x in iterable: push(x) return result() def inverted(): gen = func() try: result = next(gen) next(gen) except StopIteration: raise RuntimeError return gen.send, result consumer.invert = inverted return consumer @inverted_consumer def mean(): total = 0 count = 0 yield lambda: total / count while True: x = yield total += x count += 1 print(mean([4, 5, 6])) # prints 5 push, result = mean.invert() push(4) push(5) push(6) print(result()) # Also prints 5 Having implemented your consumer functions in this way you can use them normally but you can also implement the biconsumer functionailty that you wanted (with obvious generalisation to an N-consumer function): def biconsumer(consumer1, consumer2, iterable): push1, result1 = consumer1.invert() push2, result2 = consumer2.invert() for val1, val2 in iterable: push1(val1) push2(val2) return result1(), result2() Given some of the complaints about two colours of functions in other posts in this thread perhaps asyncio could take a similar approach. There could be a decorator so that I could define an async function with: @sync_callable def func(...): ... Then in asynchronous code I could call it as x = await func() or in synchronous code it would be x = func.sync_call() Presumably the sync_call version would fire up an event-loop and run the function until complete. Perhaps it could also take other arguments and have a signature like: def sync_call_wrapper(args, kwargs, *, loop=None, timeout=None): ... I'm not sure how viable this is given that different asynchronous functions might need different event loops etc. but maybe there's some sensible way to do it. -- Oscar