concurrent.futures cancel pending at Executor.shutdown
Hi, I think it would be very helpful to have an additional argument (cancel for example) added to Executor.shutdown that cancels all pending futures submitted to the executor. Then context manager would gain the ability to abort all futures incase of a exception, additionally this would also implement the missing cuterpart of the multiprocessing module terminate, currently we only have close.
On Jan 2, 2020, at 20:40, Miguel Ángel Prosper <miguelangel.prosper@gmail.com> wrote:
I think it would be very helpful to have an additional argument (cancel for example) added to Executor.shutdown that cancels all pending futures submitted to the executor.
Then context manager would gain the ability to abort all futures incase of a exception, additionally this would also implement the missing cuterpart of the multiprocessing module terminate, currently we only have close.
But terminate kills the process abruptly—skipping with/finally and atexit and internal interpreter cleanup, orphaning any children, etc. This is rarely what you want, especially not if you’re sharing a queue and a lock with the process. And I think you can’t cleanly abort existing futures once you’ve done that; you have to mark them as being in an unknown state, not call any callbacks, and probably raise if the parent does anything but discard them uninspected. Also, there’s no way to do the same thing for threads—and if you could, it would leave the process in a corrupted state. Having a way to clear the queue and then shutdown once existing jobs are done is a lot more manageable. But it’s not terminate; there’s still a probably short but potentially unbounded wait for shutdown to finish. (Or, if you haven’t designed your jobs to be appropriately small, a probably _long_ wait.) So the only clean way to do this is cooperative: flush the queue, send some kind of message to all children telling them to finish as quickly as possible, then wait for them to finish. If you’re looking for a way to do a “graceful shutdown if possible but after N seconds just go down hard” the way, e.g., Apache or nginx does, I don’t think that could be safely done killing child threads in Python. (In C, you can write code to handle any emergency situation up to even having a trashed heap and stack if you’re careful, but in Python, you can’t do that; you can’t interpret a single Python instruction inside such an emergency.) But since you’re shutting down as quickly as possible after the N seconds are up, you can just daemonize the whole pool, do your emergency shutdown code, and exit or abort, after which the daemon threads get safely killed immediately.
I was personally thinking of an implementation like that, cancel all still in pending and if wait is true the wait for the ones running, for both implementations. I didn't actually meant terminate literally, I just called it that as that's what multiprocessing.dummy.Pool.terminate (+ join after) does.
Looking at the implementation in concurrent/futures/thread.py, it looks like each of the worker threads repeatedly gets one item from the queue, runs it, and then checks if the executor is being shut down. Worker threads get added dynamically until the executor's max thread count is reached. New futures cannot be submitted when the executor is being shut down. The shutdown() method waits until all workers have cleanly exited. So it looks like the only time when submitted futures are neither executed nor cancelled is when there are more items in the work queue than there are worker threads. In this situation the worker threads just exit, and the unprocessed items will stay pending forever. If I analyzed this correctly, perhaps we can add some functionality where leftover work items are explicitly cancelled? I think that would satisfy the OP's requirement. I *think* it would be safe to do this in shutdown() after it has set self._shutdown but before it waits for the worker threads. --Guido On Fri, Jan 3, 2020 at 10:10 AM Miguel Ángel Prosper < miguelangel.prosper@gmail.com> wrote:
-- --Guido van Rossum (python.org/~guido) *Pronouns: he/him **(why is my pronoun here?)* <http://feministing.com/2015/02/03/how-using-they-as-a-singular-pronoun-can-c...>
gets one item from the queue, runs it, and then checks if the executor is being shut down.
That's exactly what I thought at first, but just after that the continue statement prevents that check, so all futures always get processed. Only when the sentinel is reached, which it's placed at the end, it actually does the check.
perhaps we can add some functionality where leftover work items are explicitly cancelled? I think that would satisfy the OP's requirement.
Yes that would be perfect, that way we could have a controlled but fast exit for certain scenarios, such as the ones listed in the first post. From what I examined the procedure would be very similar to the one followed when a initializer fails, both implementations clear all pending futures when it happens. On ThreadPoolExecutor specifically, just calling a modified version of _initializer_failed at the time discussed would seem to be sufficient, changing the call Future.set_exception to Future.cancel.
On Fri, Jan 3, 2020 at 3:28 PM Miguel Ángel Prosper < miguelangel.prosper@gmail.com> wrote:
Keen eye!
OK, since the current behavior of shutdown() is "disallow submitting new futures and wait for all currently queued futures to be processed", we definitely need a new flag. It looks like you have a good handle on the code -- do you want to submit a PR to GitHub to add such a parameter? -- --Guido van Rossum (python.org/~guido) *Pronouns: he/him **(why is my pronoun here?)* <http://feministing.com/2015/02/03/how-using-they-as-a-singular-pronoun-can-c...>
It looks like you have a good handle on the code -- do you want to submit a PR to GitHub to add such a parameter?
Thanks, but I'm not really sure how to implement it in the ProcessPoolExecutor, I just think the solution is probably related to the code responsible of handling a failed initializer (since they do very similar things). On the ThreadPoolExecutor maybe I could, but I haven't really checked for side effects or weird things.
Is anyone else interested in implementing this small feature for concurrent.futures?
I would certainly be willing to look into it. We've been discussing the possibility of a native threadpool for asyncio in the future ( https://bugs.python.org/issue32309), so it would certainly be beneficial for me to build some experience in working with the internals of the executors. I think implementing this small feature would be a good introduction. On Wed, Jan 15, 2020 at 5:37 PM Guido van Rossum <guido@python.org> wrote:
I would certainly be willing to look into it.
As an update to this thread for anyone interested in this feature, it's been implemented in Python 3.9 for both ProcessPoolExecutor and ThreadPoolExecutor as a new parameter to Executor.shutdown(), *cancel_futures*. For a description of the feature, see the updated documentation: https://docs.python.org/3.9/library/concurrent.futures.html#concurrent.futur... For implementation details, see the PR: https://github.com/python/cpython/pull/18057 Also, thank you Guido for bringing attention to the issue. The implementation was a bit more involved than I initially anticipated (particularly for ProcessPoolExecutor), but I found it to be well worth the effort! Both for the benefit of the new feature, and for the opportunity to work with a part of the internals of the executors. On Wed, Jan 15, 2020 at 5:59 PM Kyle Stanley <aeros167@gmail.com> wrote:
Thank you so much for the work, I was very confused on how to even start implementing it in the ProcessPoolExecutor, but you finished everything super quick! I'm suppose that this might be better in another thread but... Could it be possible to changing the context manager of the executor to cancel futures on uncaught exceptions? If not I could also be added as an optional parameter to the executor constructor, "cancel_on_error" set to False to not changing anything. Personally I was thinking of using the executor like that; currently I have to either place everything in a try except block, handle it and then reraise it or subclass each executor.
Miguel Ángel Prosper wrote:
No problem! The ProcessPoolExecutor implementation wasn't immediately clear to me either, but after some experimentation (and some great suggestions from Brian Quinlain to improve it) I was able to get it functioning as intended. Thank you for the proposal, I think it will be a good addition for the Executor API. Miguel Ángel Prosper wrote:
Hmm, it should be possible. Do you specifically mean cancelling the pending futures once a single one of the submitted functions raises an exception, or cancelling the pending futures when the Executor itself raises an exception (I.E. BrokenProcessPool)? I would assume the prior, since that seems more useful to me. Miguel Ángel Prosper wrote:
If not I could also be added as an optional parameter to the executor constructor, "cancel_on_error" set to False to not changing anything.
Assuming we did implement something like this, I'd say it should be done as an optional parameter for the constructor (as stated above) rather than as the default option when an exception occurs, because it would significantly different from the current behavior. At the moment, the executor doesn't mind when exceptions are raised from submitted functions; it simply sets the exception to the future (via ``future.set_exception()``) and moves on to the next. So, cancelling the pending futures after one raises an exception would be a significant behavioral change; one that could easily result in breaking existing code. As a result, I think it should be an optional parameter if anything. As far as whether or not it should be implemented, I'm not 100% certain. While I'm not against the idea of it, I think it would be reasonable to wait until the new parameter is released in Python 3.9 and see what users think of it prior to adding a similar parameter to the executor constructor as well. The cost of adding a new feature demanded by users is relatively minimal, but once it's added we're stuck with maintaining it. If the idea is approved, I'd be happy to help with implementing it. I just want to make sure that we actually want it in the first place. What do you think, Guido? I've also CC'd Brian Quinlain and Antoine Pitrou, in case they have any feedback but didn't see the original thread. On Mon, Feb 3, 2020 at 2:27 AM Miguel Ángel Prosper < miguelangel.prosper@gmail.com> wrote:
I was referring to any exception within the context manager of the executor, not the futures or just the executor's exceptions, something such as the following: with ThreadPoolExecutor(cancel_on_error=True) as executor: # Code here raise Exception Then if Executor.__exit__ detects an exception it would call shutdown with cancel_futures set to True.
Then if Executor.__exit__ detects an exception it would call shutdown with cancel_futures set to True.
Oh, I see. That should be rather simple: ``` def __exit__(self, exc_type, exc_val, exc_tb): if exc_val is not None and self._cancel_on_error: self.shutdown(wait=True, cancel_futures=True) else: self.shutdown(wait=True) return False ``` (I believe the above would have to be specified as an override for each of ThreadPoolExecutor and ProcessPoolExecutor if *cancel_on_error* were supplied as a parameter to the executor constructor. They both currently use the ``__exit__`` specified in the abstract Executor class, which wouldn't have access to *cancel_on_error*.) My previous consideration about waiting for user input of *cancel_futures *after the release of 3.9*, *before adding *cancel_on_error* to the constructor still applies to some degree though. If it directly uses executor.shutdown() with cancel_futures set to True, the maintenance would be rather minimal. But, it would potentially risk adding an underutilized parameter to the executor constructor (which contributes to feature bloat). On Mon, Feb 3, 2020 at 4:03 AM Miguel Ángel Prosper < miguelangel.prosper@gmail.com> wrote:
But, it would potentially risk adding an underutilized parameter to the executor constructor (which contributes to feature bloat).
That's true, personally I would always enable cancel_on_error (making it redundant and implementing it in the abstract class), but that's just my use case. You can always add that functionality later with your own subclass. Personally I think it make sense to do it by default but that's just me and it also changes current behaviour.
Thanks for all your hard work on the `cancel_futures` feature! As you said, there is a complexity cost (both in terms of the API and the implementation) whenever a new feature is added. The current ProcessPoolExecutor implementation, in particular, is complex enough that I can't easily easily reason about it's behavior and there are some long-standing bugs that have been difficult to resolve. I was actually hoping that we wouldn't touch the API for a while so I could do some stability work. I'm -0 on the `cancel_on_error` feature but maybe it would help to hear about a concrete use case that can't be easily accomplished using the current API? Cheers, Brian On Mon, Feb 3, 2020 at 12:36 AM Kyle Stanley <aeros167@gmail.com> wrote:
Sincerely I would have to agree that it's seems a bit excessive the `cancel_on_error`, unless it enabled by default and implemented in the abstract class it should probably not be included, that was just an idea to keep backwards compatibility. I will personally simply add subclass of my prefered executor with the following for my particular use case: def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True, cancel_futures=exc_val is not None) return False Again, I think aborting futures on uncaught exceptions make sense, but it would break backwards compatibility.
On Jan 3, 2020, at 10:11, Miguel Ángel Prosper <miguelangel.prosper@gmail.com> wrote:
OK, that makes sense. And it seems like it should be implementable; the only hard part is identifying all the edge cases and verifying they all do the right thing for both threads and processes. But I don’t think “terminate” is the right name. Maybe “cancel”? Or even “shutdown(wait=whatever, cancel=True)?” I think Java inspired this library, so maybe it’s worth looking at what they do. But IIRC, it’s a much more complicated API in general, and for this case you’d actually have to do something like this: x.shutdown() # stop new submissions x.cancelAll() # cancel all tasks still in the queue x.purge() # remove and handle all canceled tasks x.join() # wait for already-started tasks to finish … which probably isn’t what we want.
I didn't actually meant terminate literally, I just called it that as that's what multiprocessing.dummy.Pool.terminate (+ join after) does.
IIRC, it only does that by accident, because dummy.Process.terminate is a no-op, and that isn’t documented but just happens to be what CPython does.
But I don’t think “terminate” is the right name. Maybe “cancel”? Or even “shutdown(wait=whatever, cancel=True)?”
"terminate" was definitely not a good name, especially because it doesn't actually terminate anything, it just cancels some of the operations. Since it also has to cooperate with the shutdown method, executing code after the state change but also before the wait, probably the best way to implement it would be just to add an additional parameter to shutdown. "cancel" seems perfect, by default set to false, to not change the current behavior. Then maybe it would be a good idea to change the __exit__ method to set cancel to true if an exception is being handled, but I'm not sure about that.
participants (5)
-
Andrew Barnert
-
Brian Quinlan
-
Guido van Rossum
-
Kyle Stanley
-
Miguel Ángel Prosper