[Python-ideas] [Python-Dev] minmax() function returning (minimum, maximum) tuple of a sequence

Lie Ryan lie.1296 at gmail.com
Sun Oct 24 23:59:16 CEST 2010


On 10/16/10 07:00, Ron Adam wrote:
> 
> 
> On 10/15/2010 02:04 PM, Arnaud Delobelle wrote:
> 
>>> Because it would always interpret a list of values as a single item.
>>>
>>> This function looks at args and if its a single value without an
>>> "__iter__"
>>> method, it passes it to min as min([value], **kwds) instead of
>>> min(value,
>>> **kwds).
>>
>> But there are many iterable objects which are also comparable (hence
>> it makes sense to consider their min/max), for example strings.
>>
>> So we get:
>>
>>       xmin("foo", "bar", "baz") == "bar"
>>       xmin("foo", "bar") == "bar"
>>
>> but:
>>
>>      xmin("foo") == "f"
>>
>> This will create havoc in your running min routine.
>>
>> (Notice the same will hold for min() but at least you know that min(x)
>> considers x as an iterable and complains if it isn't)
> 
> Yes
> 
> There doesn't seem to be a way to generalize min/max in a way to handle
> all the cases without knowing the context.
> 
> So in a coroutine version of Tals class, you would need to pass a hint
> along with the value.
> 
> Ron

There is a way, by using threading, and injecting a thread-safe tee into
max/min/otherFuncs (over half of the code is just for implementing
thread-safe tee). Using this, there is no need to make any modification
to min/max. I suppose it might be possible to convert this to using the
new coroutine proposal (though I haven't been following the proposal
close enough).

The code is quite slow (and ugly), but it can handle large generators
(or infinite generators). The memory shouldn't grow if the functions in
*funcs takes more or less similar amount of time (which is true in case
of max and min); if *funcs need to take both very fast and very slow
codes at the same time, some more code can be added for load-balancing
by stalling faster threads' request for more items, until the slower
threads finishes.

Pros:
- no modification to max/min

Cons:
- slow, since itertools.tee() is reimplemented in pure-python
- thread is uninterruptible


import threading, itertools

class counting_dict(dict):
    """ A thread-safe dict that allows its items to be accessed
        max_access times, after that the item is deleted.

        >>> d = counting_dict(2)
        >>> d['a'] = 'e'
        >>> d['a']
        'e'
        >>> d['a']
        'e'
        >>> d['a']
        Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "<stdin>", line 10, in __getitem__
        KeyError: 'a'
    """
    def __init__(self, max_access):
        self.max_access = max_access
    def __setitem__(self, key, item):
        super().__setitem__(key,
            [item, self.max_access, threading.Lock()]
        )
    def __getitem__(self, key):
        val = super().__getitem__(key)
        item, count, lock = val
        with lock:
            val[1] -= 1
            if val[1] == 0: del self[key]
        return item

def tee(iterable, n=2):
    """ like itertools.tee(), but thread-safe """
    def _tee():
        for i in itertools.count():
            try:
                yield cache[i]
            except KeyError:
                producer_next()
                yield cache[i]
    def produce(next):
        for i in itertools.count():
            cache[i] = next()
            yield
    produce.lock = threading.Lock()

    def producer_next():
        with produce.lock:
            next(producer); next(producer);
            next(producer); next(producer);

    cache = counting_dict(n)
    it = iter(iterable)
    producer = produce(it.__next__)
    return [_tee() for _ in range(n)]

def parallel_reduce(iterable, *funcs):
    class Worker(threading.Thread):
        def __init__(self, source, func):
            super().__init__()
            self.source = source
            self.func = func
        def run(self):
            self.result = self.func(self.source)

    sources = tee(iterable, len(funcs))
    threads = []
    for func, source in zip(funcs, sources):
        thread = Worker(source, func)
        thread.setDaemon(True)
        threads.append(thread)

    for thread in threads:
        thread.start()

    # this lets Ctrl+C work, it doesn't actually terminate
    # currently running threads
    for thread in threads:
        while thread.isAlive():
            thread.join(100)

    return tuple(thread.result for thread in threads)

# test code
import random, time
parallel_reduce([4, 6, 2, 3, 5, 7, 2, 3, 7, 8, 9, 6, 2, 10], min, max)
l = (random.randint(-1000000000, 1000000000) for _ in range(100000))
start = time.time(); parallel_reduce(l, min, min, max, min, max);
time.time() - start




More information about the Python-ideas mailing list