[Python-ideas] Add `Executor.filter`
Ram Rachum
ram at rachum.com
Sat May 2 11:25:30 CEST 2015
Okay, I implemented it. Might be getting something wrong because I've never
worked with the internals of this module before. See attached file for a
demonstration, and here's the code for just the method:
def filter(self, fn, iterable, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
items_and_futures = [
(item, self.submit(fn, item)) for item in iterable
]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
for item, future in items_and_futures:
if timeout is None:
result = future.result()
else:
result = future.result(end_time - time.time())
if result:
yield item
finally:
for _, future in items_and_futures:
future.cancel()
return result_iterator()
On Sat, May 2, 2015 at 1:39 AM, Andrew Barnert <abarnert at yahoo.com> wrote:
> On May 1, 2015, at 08:13, Ram Rachum <ram at rachum.com> wrote:
>
> I envisioned it being implemented directly on `Executor`, so it'll
> automatically apply to all executor types. (I'll be happy to write the
> implementation if we have a general feeling that this is a desired feature.)
>
>
> I'd say just write it if you want it. If it turns out to be so trivial
> everyone decides it's unnecessary to add, you've only wasted 10 minutes. If
> it turns out to be tricky enough to take more time, that in itself will be
> a great argument that it should be added so users don't screw it up
> themselves.
>
> Plus, of course, even if it gets rejected, you'll have the code you want
> for your own project. :)
>
>
> On Fri, May 1, 2015 at 6:08 PM, Guido van Rossum <guido at python.org> wrote:
>
>> Sounds like should be an easy patch. Of course, needs to work for
>> ProcessPoolExecutor too.
>>
>> On Fri, May 1, 2015 at 1:12 AM, Ram Rachum <ram at rachum.com> wrote:
>>
>>> Hi,
>>>
>>> What do you think about adding a method: `Executor.filter`?
>>>
>>> I was using something like this:
>>>
>>> my_things = [thing for thing in things if some_condition(thing)]
>>>
>>>
>>> But the problem was that `some_condition` took a long time to run
>>> waiting on I/O, which is a great candidate for parallelizing with
>>> ThreadPoolExecutor. I made it work using `Executor.map` and some
>>> improvizing, but it would be nicer if I could do:
>>>
>>> with concurrent.futures.ThreadPoolExecutor(100) as executor:
>>> my_things = executor.filter(some_condition, things)
>>>
>>> And have the condition run in parallel on all the threads.
>>>
>>> What do you think?
>>>
>>>
>>> Thanks,
>>> Ram.
>>>
>>> _______________________________________________
>>> Python-ideas mailing list
>>> Python-ideas at python.org
>>> https://mail.python.org/mailman/listinfo/python-ideas
>>> Code of Conduct: http://python.org/psf/codeofconduct/
>>>
>>
>>
>>
>> --
>> --Guido van Rossum (python.org/~guido)
>>
>
> _______________________________________________
> Python-ideas mailing list
> Python-ideas at python.org
> https://mail.python.org/mailman/listinfo/python-ideas
> Code of Conduct: http://python.org/psf/codeofconduct/
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20150502/b8b1d543/attachment-0001.html>
-------------- next part --------------
import concurrent.futures
import time
import requests
class NiceExecutorMixin:
def filter(self, fn, iterable, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
items_and_futures = [
(item, self.submit(fn, item)) for item in iterable
]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
for item, future in items_and_futures:
if timeout is None:
result = future.result()
else:
result = future.result(end_time - time.time())
if result:
yield item
finally:
for _, future in items_and_futures:
future.cancel()
return result_iterator()
class MyThreadPoolExecutor(NiceExecutorMixin,
concurrent.futures.ThreadPoolExecutor):
pass
def has_wikipedia_page(name):
response = requests.get(
'http://en.wikipedia.org/wiki/%s' % name.replace(' ', '_')
)
return response.status_code == 200
if __name__ == '__main__':
people = (
'Barack Obama', 'Shimon Peres', 'Justin Bieber',
'Some guy I saw on the street', 'Steve Buscemi',
'My first-grade teacher', 'Gandhi'
)
people_who_have_wikipedia_pages = (
'Barack Obama', 'Shimon Peres', 'Justin Bieber', 'Steve Buscemi',
'Gandhi'
)
# assert tuple(filter(has_wikipedia_page, people_who_have_wikipedia_pages)) \
# == people_who_have_wikipedia_pages
with MyThreadPoolExecutor(100) as executor:
executor_filter_result = tuple(
executor.filter(has_wikipedia_page, people)
)
print(executor_filter_result)
assert executor_filter_result == people_who_have_wikipedia_pages
More information about the Python-ideas
mailing list