[Python-Dev] futures API
Brian Quinlan
brian at sweetapp.com
Sat Dec 11 21:53:15 CET 2010
On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:
> --- El vie, 10/12/10, Brian Quinlan escribió:
>> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
>>> --- El vie, 10/12/10, Brian Quinlan escribió:
>>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
>>>>> I have a process running for a long time, and
>> which
>>>> may use futures of different max_workers count. I
>> think it
>>>> is not too far-fetched to create a new futures
>> object each
>>>> time. Yet, the execution becomes slower after each
>> call, for
>>>> example with http://freehackers.org/~tnagy/futures_test.py:
>>>>>
>>>>> """
>>>>> import concurrent.futures
>>>>> from queue import Queue
>>>>> import datetime
>>>>>
>>>>> class counter(object):
>>>>> def __init__(self, fut):
>>>>> self.fut =
>> fut
>>>>>
>>>>> def run(self):
>>>>> def
>>>> look_busy(num, obj):
>>>>>
>>>> tot = 0
>>>>>
>>>> for x in range(num):
>>>>>
>>>> tot += x
>>>>>
>>>> obj.out_q.put(tot)
>>>>>
>>>>> start =
>>>> datetime.datetime.utcnow()
>>>>> self.count =
>> 0
>>>>> self.out_q
>> =
>>>> Queue(0)
>>>>> for x in
>>>> range(1000):
>>>>>
>>>> self.count += 1
>>>>>
>>>> self.fut.submit(look_busy,
>> self.count,
>>>> self)
>>>>>
>>>>> while
>>>> self.count:
>>>>>
>>>> self.count -= 1
>>>>>
>>>> self.out_q.get()
>>>>>
>>>>> delta =
>>>> datetime.datetime.utcnow() - start
>>>>>
>>>> print(delta.total_seconds())
>>>>>
>>>>> fut =
>>>>
>> concurrent.futures.ThreadPoolExecutor(max_workers=20)
>>>>> for x in range(100):
>>>>> # comment the following
>> line
>>>>> fut =
>>>>
>> concurrent.futures.ThreadPoolExecutor(max_workers=20)
>>>>> c = counter(fut)
>>>>> c.run()
>>>>> """
>>>>>
>>>>> The runtime grows after each step:
>>>>> 0.216451
>>>>> 0.225186
>>>>> 0.223725
>>>>> 0.222274
>>>>> 0.230964
>>>>> 0.240531
>>>>> 0.24137
>>>>> 0.252393
>>>>> 0.249948
>>>>> 0.257153
>>>>> ...
>>>>>
>>>>> Is there a mistake in this piece of code?
>>>>
>>>> There is no mistake that I can see but I suspect
>> that the
>>>> circular references that you are building are
>> causing the
>>>> ThreadPoolExecutor to take a long time to be
>> collected. Try
>>>> adding:
>>>>
>>>> c = counter(fut)
>>>> c.run()
>>>> + fut.shutdown()
>>>>
>>>> Even if that fixes your problem, I still don't
>> fully
>>>> understand this because I would expect the runtime
>> to fall
>>>> after a while as ThreadPoolExecutors are
>> collected.
>>>
>>> The shutdown call is indeed a good fix :-) Here is the
>> time response
>>> of the calls to counter() when shutdown is not
>> called:
>>> http://www.freehackers.org/~tnagy/runtime_futures.png
>>
>> FWIW, I think that you are confusion the term "future"
>> with
>> "executor". A future represents a single work item. An
>> executor
>> creates futures and schedules their underlying work.
>
> Ah yes, sorry. I have also realized that the executor is not the
> killer feature I was expecting, it can only replace a little part of
> the code I have: controlling the exceptions and the workflow is the
> most complicated part.
>
> I have also observed a minor performance degradation with the
> executor replacement (3 seconds for 5000 work items). The amount of
> work items processed by unit of time does not seem to be a straight
> line: http://www.freehackers.org/~tnagy/runtime_futures_2.png .
That looks pretty linear to me.
> Out of curiosity, what is the "_thread_references" for?
There is a big comment above it in the code:
# Workers are created as daemon threads. This is done to allow the
interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's
thread
# pool (i.e. shutdown() was not called). However, allowing workers to
die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which
could
# be bad if the callable being evaluated has external side-effects
e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which
tells the
# workers to exit when their work queues are empty and then waits
until the
# threads finish.
_thread_references = set()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
for thread_reference in _thread_references:
thread = thread_reference()
if thread is not None:
thread.join()
Is it still unclear why it is there? Maybe you could propose some
additional documentation.
Cheers,
Brian
> The source file for the example is in:
> http://www.freehackers.org/~tnagy/futures_test3.py
>
> The diagram was created by:
> http://www.freehackers.org/~tnagy/futures_test3.plot
>
> Thomas
>
>
>
>
> _______________________________________________
> Python-Dev mailing list
> Python-Dev at python.org
> http://mail.python.org/mailman/listinfo/python-dev
> Unsubscribe: http://mail.python.org/mailman/options/python-dev/brian%40sweetapp.com
More information about the Python-Dev
mailing list