[Python-Dev] futures API
Thomas Nagy
tnagyemail-mail at yahoo.fr
Sun Dec 12 03:32:18 CET 2010
--- El sáb, 11/12/10, Brian Quinlan escribió:
>
> On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:
>
> > --- El vie, 10/12/10, Brian Quinlan escribió:
> >> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
> >>> --- El vie, 10/12/10, Brian Quinlan
> escribió:
> >>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy
> wrote:
> >>>>> I have a process running for a long
> time, and
> >> which
> >>>> may use futures of different max_workers
> count. I
> >> think it
> >>>> is not too far-fetched to create a new
> futures
> >> object each
> >>>> time. Yet, the execution becomes slower
> after each
> >> call, for
> >>>> example with http://freehackers.org/~tnagy/futures_test.py:
> >>>>>
> >>>>> """
> >>>>> import concurrent.futures
> >>>>> from queue import Queue
> >>>>> import datetime
> >>>>>
> >>>>> class counter(object):
> >>>>> def
> __init__(self, fut):
> >>>>>
> self.fut =
> >> fut
> >>>>>
> >>>>> def
> run(self):
> >>>>>
> def
> >>>> look_busy(num, obj):
> >>>>>
> >>>> tot = 0
> >>>>>
> >>>> for x in
> range(num):
> >>>>>
> >>>> tot += x
> >>>>>
> >>>>
> obj.out_q.put(tot)
> >>>>>
> >>>>>
> start =
> >>>> datetime.datetime.utcnow()
> >>>>>
> self.count =
> >> 0
> >>>>>
> self.out_q
> >> =
> >>>> Queue(0)
> >>>>>
> for x in
> >>>> range(1000):
> >>>>>
> >>>> self.count += 1
> >>>>>
> >>>>
> self.fut.submit(look_busy,
> >> self.count,
> >>>> self)
> >>>>>
> >>>>>
> while
> >>>> self.count:
> >>>>>
> >>>> self.count -= 1
> >>>>>
> >>>> self.out_q.get()
> >>>>>
> >>>>>
> delta =
> >>>> datetime.datetime.utcnow() - start
> >>>>>
> >>>>
> print(delta.total_seconds())
> >>>>>
> >>>>> fut =
> >>>>
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>>> for x in range(100):
> >>>>> #
> comment the following
> >> line
> >>>>> fut =
> >>>>
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>>> c =
> counter(fut)
> >>>>>
> c.run()
> >>>>> """
> >>>>>
> >>>>> The runtime grows after each step:
> >>>>> 0.216451
> >>>>> 0.225186
> >>>>> 0.223725
> >>>>> 0.222274
> >>>>> 0.230964
> >>>>> 0.240531
> >>>>> 0.24137
> >>>>> 0.252393
> >>>>> 0.249948
> >>>>> 0.257153
> >>>>> ...
> >>>>>
> >>>>> Is there a mistake in this piece of
> code?
> >>>>
> >>>> There is no mistake that I can see but I
> suspect
> >> that the
> >>>> circular references that you are building
> are
> >> causing the
> >>>> ThreadPoolExecutor to take a long time to
> be
> >> collected. Try
> >>>> adding:
> >>>>
> >>>> c = counter(fut)
> >>>> c.run()
> >>>> + fut.shutdown()
> >>>>
> >>>> Even if that fixes your problem, I still
> don't
> >> fully
> >>>> understand this because I would expect the
> runtime
> >> to fall
> >>>> after a while as ThreadPoolExecutors are
> >> collected.
> >>>
> >>> The shutdown call is indeed a good fix :-)
> Here is the
> >> time response
> >>> of the calls to counter() when shutdown is
> not
> >> called:
> >>> http://www.freehackers.org/~tnagy/runtime_futures.png
> >>
> >> FWIW, I think that you are confusion the term
> "future"
> >> with
> >> "executor". A future represents a single work
> item. An
> >> executor
> >> creates futures and schedules their underlying
> work.
> >
> > Ah yes, sorry. I have also realized that the executor
> is not the killer feature I was expecting, it can only
> replace a little part of the code I have: controlling the
> exceptions and the workflow is the most complicated part.
> >
> > I have also observed a minor performance degradation
> with the executor replacement (3 seconds for 5000 work
> items). The amount of work items processed by unit of time
> does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png
> .
>
> That looks pretty linear to me.
Ok.
> > Out of curiosity, what is the "_thread_references"
> for?
>
> There is a big comment above it in the code:
>
> # Workers are created as daemon threads. This is done to
> allow the interpreter
> # to exit when there are still idle threads in a
> ThreadPoolExecutor's thread
> # pool (i.e. shutdown() was not called). However, allowing
> workers to die with
> # the interpreter has two undesirable properties:
> # - The workers would still be running
> during interpretor shutdown,
> # meaning that they would fail in
> unpredictable ways.
> # - The workers could be killed while
> evaluating a work item, which could
> # be bad if the callable being
> evaluated has external side-effects e.g.
> # writing to a file.
> #
> # To work around this problem, an exit handler is installed
> which tells the
> # workers to exit when their work queues are empty and then
> waits until the
> # threads finish.
>
> _thread_references = set()
> _shutdown = False
>
> def _python_exit():
> global _shutdown
> _shutdown = True
> for thread_reference in _thread_references:
> thread = thread_reference()
> if thread is not None:
> thread.join()
>
> Is it still unclear why it is there? Maybe you could
> propose some additional documentation.
I was thinking that if exceptions have to be caught - and it is likely to be the case in general - then this scheme is redundant. Now I see that the threads are getting their work items from a queue, so it is clear now.
Thanks for all the information,
Thomas
More information about the Python-Dev
mailing list