Re: [Python-Dev] futures API
--- El vie, 10/12/10, Brian Quinlan escribió:
On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
I have a process running for a long time, and which may use futures of different max_workers count. I think it is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py:
""" import concurrent.futures from queue import Queue import datetime
class counter(object): def __init__(self, fut): self.fut = fut
def run(self): def look_busy(num, obj):
tot = 0
for x in range(num):
tot += x
obj.out_q.put(tot)
start = datetime.datetime.utcnow() self.count = 0 self.out_q = Queue(0) for x in range(1000):
self.count += 1
self.fut.submit(look_busy, self.count, self)
while self.count:
self.count -= 1
self.out_q.get()
delta = datetime.datetime.utcnow() - start
print(delta.total_seconds())
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100): # comment the following line fut = concurrent.futures.ThreadPoolExecutor(max_workers=20) c = counter(fut) c.run() """
The runtime grows after each step: 0.216451 0.225186 0.223725 0.222274 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ...
Is there a mistake in this piece of code?
There is no mistake that I can see but I suspect that the circular references that you are building are causing the ThreadPoolExecutor to take a long time to be collected. Try adding:
c = counter(fut) c.run() + fut.shutdown()
Even if that fixes your problem, I still don't fully understand this because I would expect the runtime to fall after a while as ThreadPoolExecutors are collected.
The shutdown call is indeed a good fix :-) Here is the time response of the calls to counter() when shutdown is not called: http://www.freehackers.org/~tnagy/runtime_futures.png After trying to stop the program by using CTRL+C, the following error may appear, after which the process cannot be interrupted: """ 19:18:12 /tmp/build> python3.2 futures_test.py 0.389657 0.417173 0.416513 0.421424 0.449666 0.482273 ^CTraceback (most recent call last): File "futures_test.py", line 36, in <module> c.run() File "futures_test.py", line 22, in run self.fut.submit(look_busy, self.count, self) File "/usr/local/lib/python3.2/concurrent/futures/thread.py", line 114, in submit self._work_queue.put(w) File "/usr/local/lib/python3.2/queue.py", line 135, in put self.not_full.acquire() KeyboardInterrupt """ It is not expected, is it? Thomas
On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
--- El vie, 10/12/10, Brian Quinlan escribió:
On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
I have a process running for a long time, and which may use futures of different max_workers count. I think it is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py:
""" import concurrent.futures from queue import Queue import datetime
class counter(object): def __init__(self, fut): self.fut = fut
def run(self): def look_busy(num, obj):
tot = 0
for x in range(num):
tot += x
obj.out_q.put(tot)
start =
datetime.datetime.utcnow()
self.count = 0 self.out_q =
Queue(0)
for x in
range(1000):
self.count += 1
self.fut.submit(look_busy, self.count, self)
while
self.count:
self.count -= 1
self.out_q.get()
delta =
datetime.datetime.utcnow() - start
print(delta.total_seconds())
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100): # comment the following line fut = concurrent.futures.ThreadPoolExecutor(max_workers=20) c = counter(fut) c.run() """
The runtime grows after each step: 0.216451 0.225186 0.223725 0.222274 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ...
Is there a mistake in this piece of code?
There is no mistake that I can see but I suspect that the circular references that you are building are causing the ThreadPoolExecutor to take a long time to be collected. Try adding:
c = counter(fut) c.run() + fut.shutdown()
Even if that fixes your problem, I still don't fully understand this because I would expect the runtime to fall after a while as ThreadPoolExecutors are collected.
The shutdown call is indeed a good fix :-) Here is the time response of the calls to counter() when shutdown is not called: http://www.freehackers.org/~tnagy/runtime_futures.png
FWIW, I think that you are confusion the term "future" with "executor". A future represents a single work item. An executor creates futures and schedules their underlying work. Hmmm....that is very suspicious - it looks like the ThreadPoolExecutors are not being collected. If you are feeling bored you could figure out why not :-)
After trying to stop the program by using CTRL+C, the following error may appear, after which the process cannot be interrupted:
""" 19:18:12 /tmp/build> python3.2 futures_test.py 0.389657 0.417173 0.416513 0.421424 0.449666 0.482273 ^CTraceback (most recent call last): File "futures_test.py", line 36, in <module> c.run() File "futures_test.py", line 22, in run self.fut.submit(look_busy, self.count, self) File "/usr/local/lib/python3.2/concurrent/futures/thread.py", line 114, in submit self._work_queue.put(w) File "/usr/local/lib/python3.2/queue.py", line 135, in put self.not_full.acquire() KeyboardInterrupt """
It is not expected, is it?
It isn't surprising. Python lock acquisitions are not interruptible and anytime you interrupt a program that manipulates locks you may kill the code that was going to cause the lock to be released. Cheers, Brian
Thomas
--- El vie, 10/12/10, Brian Quinlan escribió:
--- El vie, 10/12/10, Brian Quinlan escribió:
On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
I have a process running for a long time, and which may use futures of different max_workers count. I
is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py:
""" import concurrent.futures from queue import Queue import datetime
class counter(object): def __init__(self, fut): self.fut =
fut
def run(self): def
look_busy(num, obj):
tot = 0
for x in range(num):
tot += x
obj.out_q.put(tot)
start =
datetime.datetime.utcnow()
self.count = 0 self.out_q = Queue(0) for x in range(1000):
self.count += 1
self.fut.submit(look_busy, self.count, self)
while
self.count:
self.count -= 1
self.out_q.get()
delta =
datetime.datetime.utcnow() - start
print(delta.total_seconds())
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100): # comment the following
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
c = counter(fut) c.run() """
The runtime grows after each step: 0.216451 0.225186 0.223725 0.222274 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ...
Is there a mistake in this piece of code?
There is no mistake that I can see but I suspect
On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote: think it line that the
circular references that you are building are causing the ThreadPoolExecutor to take a long time to be
collected. Try >> adding: >> >> c = counter(fut) >> c.run() >> + fut.shutdown() >> >> Even if that fixes your problem, I still don't fully >> understand this because I would expect the runtime to fall >> after a while as ThreadPoolExecutors are collected. > > The shutdown call is indeed a good fix :-) Here is the time response > of the calls to counter() when shutdown is not called: > http://www.freehackers.org/~tnagy/runtime_futures.png
FWIW, I think that you are confusion the term "future" with "executor". A future represents a single work item. An executor creates futures and schedules their underlying work.
Ah yes, sorry. I have also realized that the executor is not the killer feature I was expecting, it can only replace a little part of the code I have: controlling the exceptions and the workflow is the most complicated part. I have also observed a minor performance degradation with the executor replacement (3 seconds for 5000 work items). The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png . Out of curiosity, what is the "_thread_references" for? The source file for the example is in: http://www.freehackers.org/~tnagy/futures_test3.py The diagram was created by: http://www.freehackers.org/~tnagy/futures_test3.plot Thomas
On 12/11/2010 9:44 AM, Thomas Nagy wrote:
The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png . Out of curiosity, what is the "_thread_references" for?
The source file for the example is in: http://www.freehackers.org/~tnagy/futures_test3.py
The diagram was created by: http://www.freehackers.org/~tnagy/futures_test3.plot
You're test code does 50,000,000 of list appends. I suspect your benchmark is telling you more about the behavior of large lists than the overhead of the futures module. You should retry that experiment with the list pre-allocated. Beyond that, the curve in that line is not exactly a large amount of variance from a straight line. -- Scott Dial scott@scottdial.com scodial@cs.indiana.edu
On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:
--- El vie, 10/12/10, Brian Quinlan escribió:
--- El vie, 10/12/10, Brian Quinlan escribió:
On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
I have a process running for a long time, and which may use futures of different max_workers count. I
is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py:
""" import concurrent.futures from queue import Queue import datetime
class counter(object): def __init__(self, fut): self.fut =
fut
def run(self): def
look_busy(num, obj):
tot = 0
for x in range(num):
tot += x
obj.out_q.put(tot)
start =
datetime.datetime.utcnow()
self.count =
0
self.out_q
= Queue(0)
for x in
range(1000):
self.count += 1
self.fut.submit(look_busy, self.count, self)
while
self.count:
self.count -= 1
self.out_q.get()
delta =
datetime.datetime.utcnow() - start
print(delta.total_seconds())
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100): # comment the following
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
c = counter(fut) c.run() """
The runtime grows after each step: 0.216451 0.225186 0.223725 0.222274 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ...
Is there a mistake in this piece of code?
There is no mistake that I can see but I suspect
On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote: think it line that the
circular references that you are building are causing the ThreadPoolExecutor to take a long time to be
collected. Try >> adding: >> >> c = counter(fut) >> c.run() >> + fut.shutdown() >> >> Even if that fixes your problem, I still don't fully >> understand this because I would expect the runtime to fall >> after a while as ThreadPoolExecutors are collected. > > The shutdown call is indeed a good fix :-) Here is the time response > of the calls to counter() when shutdown is not called: > http://www.freehackers.org/~tnagy/runtime_futures.png
FWIW, I think that you are confusion the term "future" with "executor". A future represents a single work item. An executor creates futures and schedules their underlying work.
Ah yes, sorry. I have also realized that the executor is not the killer feature I was expecting, it can only replace a little part of the code I have: controlling the exceptions and the workflow is the most complicated part.
I have also observed a minor performance degradation with the executor replacement (3 seconds for 5000 work items). The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png .
That looks pretty linear to me.
Out of curiosity, what is the "_thread_references" for?
There is a big comment above it in the code: # Workers are created as daemon threads. This is done to allow the interpreter # to exit when there are still idle threads in a ThreadPoolExecutor's thread # pool (i.e. shutdown() was not called). However, allowing workers to die with # the interpreter has two undesirable properties: # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could # be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the # workers to exit when their work queues are empty and then waits until the # threads finish. _thread_references = set() _shutdown = False def _python_exit(): global _shutdown _shutdown = True for thread_reference in _thread_references: thread = thread_reference() if thread is not None: thread.join() Is it still unclear why it is there? Maybe you could propose some additional documentation. Cheers, Brian
The source file for the example is in: http://www.freehackers.org/~tnagy/futures_test3.py
The diagram was created by: http://www.freehackers.org/~tnagy/futures_test3.plot
Thomas
_______________________________________________ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/brian%40sweetapp.com
Brian Quinlan wrote:
On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:
I have also observed a minor performance degradation with the executor replacement (3 seconds for 5000 work items). The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png .
That looks pretty linear to me.
Close to, but not quite. The graph seems to be slightly curved, with the amount of work done per second decreasing for large amounts of work. Assuming that this performance degradation is real, and not an artifact of the measurement technique, it seems to be quite small. I'd be happy to describe it as "linear" in the same way we describe dictionary lookups as constant-time, even though technically that's not strictly true. (They're linear in the number of items with a matching hash, and there are probably other complications as well.) As drawn, the curve seems to fall away like a log graph, which might suggest to the casual viewer that this is a good thing. It may be better to reverse the axes, that is to have the independent variable, number of tasks, on the horizontal axis, and the dependent variable, cost (time taken), vertically. This will make it clear that the incremental cost of doing one extra task increases (slightly) as the number of tasks goes up. -- Steven
On Sun, Dec 12, 2010 at 6:53 AM, Brian Quinlan
Is it still unclear why it is there? Maybe you could propose some additional documentation.
Did you get my question the other day as to whether a weakref.WeakKeySet might be a better choice? I believe you would be able to get rid of the periodic sweep for dead references if you did that, and I didn't spot any obvious downsides. Cheers, Nick. -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia
On Dec 11, 2010, at 6:33 PM, Nick Coghlan wrote:
On Sun, Dec 12, 2010 at 6:53 AM, Brian Quinlan
wrote: Is it still unclear why it is there? Maybe you could propose some additional documentation.
Did you get my question the other day as to whether a weakref.WeakKeySet might be a better choice? I believe you would be able to get rid of the periodic sweep for dead references if you did that, and I didn't spot any obvious downsides.
No I didn't, sorry! Could you resent it if it has more details then the paragraph above? Cheers, Brian
Cheers, Nick.
-- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia
On Sun, Dec 12, 2010 at 12:36 PM, Brian Quinlan
On Dec 11, 2010, at 6:33 PM, Nick Coghlan wrote:
On Sun, Dec 12, 2010 at 6:53 AM, Brian Quinlan
wrote: Is it still unclear why it is there? Maybe you could propose some additional documentation.
Did you get my question the other day as to whether a weakref.WeakKeySet might be a better choice? I believe you would be able to get rid of the periodic sweep for dead references if you did that, and I didn't spot any obvious downsides.
No I didn't, sorry! Could you resent it if it has more details then the paragraph above?
There wasn't much more detail, as there actually isn't a great deal to the idea. This was the main comment in the previous email: "It seems to me that using WeakSet would mean you could get rid of _remove_dead_thread_references altogether (the implicit callbacks would handle that part), and then a set() call in _python_exit would give you concrete references to work with for cleanup purposes." Cheers, Nick. -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia
--- El vie, 10/12/10, Thomas Nagy escribió:
On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
I have a process running for a long time, and which may use futures of different max_workers count. I
is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py:
""" import concurrent.futures from queue import Queue import datetime
class counter(object): def __init__(self, fut): self.fut = fut
def run(self): def
look_busy(num, obj):
tot = 0
for x in range(num):
tot += x
obj.out_q.put(tot)
start =
datetime.datetime.utcnow()
self.count = 0 self.out_q = Queue(0) for x in range(1000):
self.count += 1
self.fut.submit(look_busy, self.count, self)
while self.count:
self.count -= 1
self.out_q.get()
delta = datetime.datetime.utcnow() - start
print(delta.total_seconds())
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100): # comment the following line fut = concurrent.futures.ThreadPoolExecutor(max_workers=20) c = counter(fut) c.run() """
The runtime grows after each step: 0.216451 0.225186 0.223725 0.222274 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ...
Is there a mistake in this piece of code?
There is no mistake that I can see but I suspect that
circular references that you are building are causing
--- El vie, 10/12/10, Brian Quinlan escribió: think it the the
ThreadPoolExecutor to take a long time to be
collected. Try > adding: > > c = counter(fut) > c.run() > + fut.shutdown() > > Even if that fixes your problem, I still don't fully > understand this because I would expect the runtime to fall > after a while as ThreadPoolExecutors are collected.
The shutdown call is indeed a good fix :-) Here is the time response of the calls to counter() when shutdown is not called: http://www.freehackers.org/~tnagy/runtime_futures.png
After trying to stop the program by using CTRL+C, the following error may appear, after which the process cannot be interrupted:
""" 19:18:12 /tmp/build> python3.2 futures_test.py 0.389657 0.417173 0.416513 0.421424 0.449666 0.482273 ^CTraceback (most recent call last): File "futures_test.py", line 36, in <module> c.run() File "futures_test.py", line 22, in run self.fut.submit(look_busy, self.count, self) File "/usr/local/lib/python3.2/concurrent/futures/thread.py", line 114, in submit self._work_queue.put(w) File "/usr/local/lib/python3.2/queue.py", line 135, in put self.not_full.acquire() KeyboardInterrupt """
It is not expected, is it?
The problem also occurs when using a callback: http://www.freehackers.org/~tnagy/futures_test2.py If it is necessary to catch KeyboardInterrupt exceptions to cancel the futures execution, then how about adding this detail to the docs? Thomas
On Dec 10, 2010, at 11:39 AM, Thomas Nagy wrote:
--- El vie, 10/12/10, Thomas Nagy escribió:
On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
I have a process running for a long time, and which may use futures of different max_workers count. I
is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py:
""" import concurrent.futures from queue import Queue import datetime
class counter(object): def __init__(self, fut): self.fut = fut
def run(self): def
look_busy(num, obj):
tot = 0
for x in range(num):
tot += x
obj.out_q.put(tot)
start =
datetime.datetime.utcnow()
self.count = 0 self.out_q =
Queue(0)
for x in
range(1000):
self.count += 1
self.fut.submit(look_busy, self.count, self)
while
self.count:
self.count -= 1
self.out_q.get()
delta =
datetime.datetime.utcnow() - start
print(delta.total_seconds())
fut =
concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100): # comment the following line fut = concurrent.futures.ThreadPoolExecutor(max_workers=20) c = counter(fut) c.run() """
The runtime grows after each step: 0.216451 0.225186 0.223725 0.222274 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ...
Is there a mistake in this piece of code?
There is no mistake that I can see but I suspect that
circular references that you are building are causing
--- El vie, 10/12/10, Brian Quinlan escribió: think it the the
ThreadPoolExecutor to take a long time to be
collected. Try > adding: > > c = counter(fut) > c.run() > + fut.shutdown() > > Even if that fixes your problem, I still don't fully > understand this because I would expect the runtime to fall > after a while as ThreadPoolExecutors are collected.
The shutdown call is indeed a good fix :-) Here is the time response of the calls to counter() when shutdown is not called: http://www.freehackers.org/~tnagy/runtime_futures.png
After trying to stop the program by using CTRL+C, the following error may appear, after which the process cannot be interrupted:
""" 19:18:12 /tmp/build> python3.2 futures_test.py 0.389657 0.417173 0.416513 0.421424 0.449666 0.482273 ^CTraceback (most recent call last): File "futures_test.py", line 36, in <module> c.run() File "futures_test.py", line 22, in run self.fut.submit(look_busy, self.count, self) File "/usr/local/lib/python3.2/concurrent/futures/thread.py", line 114, in submit self._work_queue.put(w) File "/usr/local/lib/python3.2/queue.py", line 135, in put self.not_full.acquire() KeyboardInterrupt """
It is not expected, is it?
The problem also occurs when using a callback: http://www.freehackers.org/~tnagy/futures_test2.py
If it is necessary to catch KeyboardInterrupt exceptions to cancel the futures execution, then how about adding this detail to the docs?
AFAIK, catching KeyboardInterrupt exceptions is not sufficient. Cheers, Brian
Thomas
_______________________________________________ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/brian%40sweetapp.com
On Sat, Dec 11, 2010 at 6:07 AM, Brian Quinlan
The problem also occurs when using a callback: http://www.freehackers.org/~tnagy/futures_test2.py
If it is necessary to catch KeyboardInterrupt exceptions to cancel the futures execution, then how about adding this detail to the docs?
AFAIK, catching KeyboardInterrupt exceptions is not sufficient.
finally blocks and with statements can get you a fairly long way, though. futures does everything right on this front, as far as I can see. In this case, the problem is in the design of the test program. It *must* get exactly as many items in the queue as were submitted to the executor. If one is ever missing (e.g. due to a poorly timed KeyboardInterrupt in the callback, as clearly happened in the presented trace), it will hang in the final call to self.out_q.get(). There's a reason Queue.get offers both nonblocking and block-with-timeout functionality. (Alternately, the management of out_q and count could be made smarter, such that an exception in adding a result to out_q triggered an appropriate adjustment in the count of expected values) I also agree with Brian that the variable naming for the sample code and comments like "may use futures of different max_workers count. I think it is not too far-fetched to create a new futures object each time" suggests a fundamental confusion of the terms "future" and "executor". Executors are not futures - the return value from an executor's submit method is a future. Cheers, Nick. -- Nick Coghlan | ncoghlan@gmail.com | Brisbane, Australia
participants (5)
-
Brian Quinlan
-
Nick Coghlan
-
Scott Dial
-
Steven D'Aprano
-
Thomas Nagy