[Python-ideas] Add `Executor.filter`

Ram Rachum ram at rachum.com
Sat May 2 11:25:30 CEST 2015


Okay, I implemented it. Might be getting something wrong because I've never
worked with the internals of this module before. See attached file for a
demonstration, and here's the code for just the method:

    def filter(self, fn, iterable, timeout=None):

        if timeout is not None:
            end_time = timeout + time.time()

        items_and_futures = [
            (item, self.submit(fn, item)) for item in iterable
        ]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                for item, future in items_and_futures:
                    if timeout is None:
                        result = future.result()
                    else:
                        result = future.result(end_time - time.time())
                    if result:
                        yield item
            finally:
                for _, future in items_and_futures:
                    future.cancel()
        return result_iterator()


On Sat, May 2, 2015 at 1:39 AM, Andrew Barnert <abarnert at yahoo.com> wrote:

> On May 1, 2015, at 08:13, Ram Rachum <ram at rachum.com> wrote:
>
> I envisioned it being implemented directly on `Executor`, so it'll
> automatically apply to all executor types. (I'll be happy to write the
> implementation if we have a general feeling that this is a desired feature.)
>
>
> I'd say just write it if you want it. If it turns out to be so trivial
> everyone decides it's unnecessary to add, you've only wasted 10 minutes. If
> it turns out to be tricky enough to take more time, that in itself will be
> a great argument that it should be added so users don't screw it up
> themselves.
>
> Plus, of course, even if it gets rejected, you'll have the code you want
> for your own project. :)
>
>
> On Fri, May 1, 2015 at 6:08 PM, Guido van Rossum <guido at python.org> wrote:
>
>> Sounds like should be an easy patch. Of course, needs to work for
>> ProcessPoolExecutor too.
>>
>> On Fri, May 1, 2015 at 1:12 AM, Ram Rachum <ram at rachum.com> wrote:
>>
>>> Hi,
>>>
>>> What do you think about adding a method: `Executor.filter`?
>>>
>>> I was using something like this:
>>>
>>> my_things = [thing for thing in things if some_condition(thing)]
>>>
>>>
>>> But the problem was that `some_condition` took a long time to run
>>> waiting on I/O, which is a great candidate for parallelizing with
>>> ThreadPoolExecutor. I made it work using `Executor.map` and some
>>> improvizing, but it would be nicer if I could do:
>>>
>>> with concurrent.futures.ThreadPoolExecutor(100) as executor:
>>>     my_things = executor.filter(some_condition, things)
>>>
>>> And have the condition run in parallel on all the threads.
>>>
>>> What do you think?
>>>
>>>
>>> Thanks,
>>> Ram.
>>>
>>> _______________________________________________
>>> Python-ideas mailing list
>>> Python-ideas at python.org
>>> https://mail.python.org/mailman/listinfo/python-ideas
>>> Code of Conduct: http://python.org/psf/codeofconduct/
>>>
>>
>>
>>
>> --
>> --Guido van Rossum (python.org/~guido)
>>
>
> _______________________________________________
> Python-ideas mailing list
> Python-ideas at python.org
> https://mail.python.org/mailman/listinfo/python-ideas
> Code of Conduct: http://python.org/psf/codeofconduct/
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20150502/b8b1d543/attachment-0001.html>
-------------- next part --------------
import concurrent.futures
import time
import requests

class NiceExecutorMixin:
    def filter(self, fn, iterable, timeout=None):
        
        if timeout is not None:
            end_time = timeout + time.time()

        items_and_futures = [
            (item, self.submit(fn, item)) for item in iterable
        ]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                for item, future in items_and_futures:
                    if timeout is None:
                        result = future.result()
                    else:
                        result = future.result(end_time - time.time())
                    if result:
                        yield item
            finally:
                for _, future in items_and_futures:
                    future.cancel()
        return result_iterator()


        
class MyThreadPoolExecutor(NiceExecutorMixin,
                           concurrent.futures.ThreadPoolExecutor):
    pass

def has_wikipedia_page(name):
    response = requests.get(
        'http://en.wikipedia.org/wiki/%s' % name.replace(' ', '_')
    )
    return response.status_code == 200
    

if __name__ == '__main__':
    
    people = (
        'Barack Obama', 'Shimon Peres', 'Justin Bieber',
        'Some guy I saw on the street', 'Steve Buscemi',
        'My first-grade teacher', 'Gandhi'
    )
    people_who_have_wikipedia_pages = (
        'Barack Obama', 'Shimon Peres', 'Justin Bieber', 'Steve Buscemi',
        'Gandhi'
    )
    # assert tuple(filter(has_wikipedia_page, people_who_have_wikipedia_pages)) \
                                             # == people_who_have_wikipedia_pages
    with MyThreadPoolExecutor(100) as executor:
        executor_filter_result = tuple(
            executor.filter(has_wikipedia_page, people)
        )
        print(executor_filter_result)
        assert executor_filter_result == people_who_have_wikipedia_pages
        


More information about the Python-ideas mailing list