[Python-checkins] bpo-39995: Fix concurrent.futures _ThreadWakeup (GH-19760)

Victor Stinner webhook-mailer at python.org
Tue Apr 28 21:32:15 EDT 2020


https://github.com/python/cpython/commit/a4dfe8ede5a37576e17035dccfe109ba7752237e
commit: a4dfe8ede5a37576e17035dccfe109ba7752237e
branch: master
author: Victor Stinner <vstinner at python.org>
committer: GitHub <noreply at github.com>
date: 2020-04-29T03:32:06+02:00
summary:

bpo-39995: Fix concurrent.futures _ThreadWakeup (GH-19760)

Fix a race condition in concurrent.futures._ThreadWakeup: access to
_ThreadWakeup is now protected with the shutdown lock.

files:
A Misc/NEWS.d/next/Library/2020-04-28-18-25-27.bpo-39995.WmA3Gk.rst
M Lib/concurrent/futures/process.py

diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 8e9b69a8f08b4..a76e2c9cf231a 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -90,6 +90,7 @@ def _python_exit():
     _global_shutdown = True
     items = list(_threads_wakeups.items())
     for _, thread_wakeup in items:
+        # call not protected by ProcessPoolExecutor._shutdown_lock
         thread_wakeup.wakeup()
     for t, _ in items:
         t.join()
@@ -157,8 +158,10 @@ def __init__(self, work_id, fn, args, kwargs):
 
 class _SafeQueue(Queue):
     """Safe Queue set exception to the future object linked to a job"""
-    def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
+    def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
+                 thread_wakeup):
         self.pending_work_items = pending_work_items
+        self.shutdown_lock = shutdown_lock
         self.thread_wakeup = thread_wakeup
         super().__init__(max_size, ctx=ctx)
 
@@ -167,7 +170,8 @@ def _on_queue_feeder_error(self, e, obj):
             tb = traceback.format_exception(type(e), e, e.__traceback__)
             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
             work_item = self.pending_work_items.pop(obj.work_id, None)
-            self.thread_wakeup.wakeup()
+            with self.shutdown_lock:
+                self.thread_wakeup.wakeup()
             # work_item can be None if another process terminated. In this
             # case, the executor_manager_thread fails all work_items
             # with BrokenProcessPool
@@ -268,6 +272,7 @@ def __init__(self, executor):
         # A _ThreadWakeup to allow waking up the queue_manager_thread from the
         # main Thread and avoid deadlocks caused by permanently locked queues.
         self.thread_wakeup = executor._executor_manager_thread_wakeup
+        self.shutdown_lock = executor._shutdown_lock
 
         # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
         # to determine if the ProcessPoolExecutor has been garbage collected
@@ -275,10 +280,13 @@ def __init__(self, executor):
         # When the executor gets garbage collected, the weakref callback
         # will wake up the queue management thread so that it can terminate
         # if there is no pending work item.
-        def weakref_cb(_, thread_wakeup=self.thread_wakeup):
+        def weakref_cb(_,
+                       thread_wakeup=self.thread_wakeup,
+                       shutdown_lock=self.shutdown_lock):
             mp.util.debug('Executor collected: triggering callback for'
                           ' QueueManager wakeup')
-            thread_wakeup.wakeup()
+            with shutdown_lock:
+                thread_wakeup.wakeup()
 
         self.executor_reference = weakref.ref(executor, weakref_cb)
 
@@ -363,6 +371,7 @@ def wait_result_broken_or_wakeup(self):
         # submitted, from the executor being shutdown/gc-ed, or from the
         # shutdown of the python interpreter.
         result_reader = self.result_queue._reader
+        assert not self.thread_wakeup._closed
         wakeup_reader = self.thread_wakeup._reader
         readers = [result_reader, wakeup_reader]
         worker_sentinels = [p.sentinel for p in self.processes.values()]
@@ -380,7 +389,9 @@ def wait_result_broken_or_wakeup(self):
 
         elif wakeup_reader in ready:
             is_broken = False
-        self.thread_wakeup.clear()
+
+        with self.shutdown_lock:
+            self.thread_wakeup.clear()
 
         return result_item, is_broken, cause
 
@@ -500,7 +511,8 @@ def join_executor_internals(self):
         # Release the queue's resources as soon as possible.
         self.call_queue.close()
         self.call_queue.join_thread()
-        self.thread_wakeup.close()
+        with self.shutdown_lock:
+            self.thread_wakeup.close()
         # If .join() is not called on the created processes then
         # some ctx.Queue methods may deadlock on Mac OS X.
         for p in self.processes.values():
@@ -619,6 +631,8 @@ def __init__(self, max_workers=None, mp_context=None,
         # _result_queue to send wakeup signals to the executor_manager_thread
         # as it could result in a deadlock if a worker process dies with the
         # _result_queue write lock still acquired.
+        #
+        # _shutdown_lock must be locked to access _ThreadWakeup.
         self._executor_manager_thread_wakeup = _ThreadWakeup()
 
         # Create communication channels for the executor
@@ -629,6 +643,7 @@ def __init__(self, max_workers=None, mp_context=None,
         self._call_queue = _SafeQueue(
             max_size=queue_size, ctx=self._mp_context,
             pending_work_items=self._pending_work_items,
+            shutdown_lock=self._shutdown_lock,
             thread_wakeup=self._executor_manager_thread_wakeup)
         # Killed worker processes can produce spurious "broken pipe"
         # tracebacks in the queue's own worker thread. But we detect killed
@@ -718,12 +733,12 @@ def shutdown(self, wait=True, *, cancel_futures=False):
         with self._shutdown_lock:
             self._cancel_pending_futures = cancel_futures
             self._shutdown_thread = True
+            if self._executor_manager_thread_wakeup is not None:
+                # Wake up queue management thread
+                self._executor_manager_thread_wakeup.wakeup()
 
-        if self._executor_manager_thread:
-            # Wake up queue management thread
-            self._executor_manager_thread_wakeup.wakeup()
-            if wait:
-                self._executor_manager_thread.join()
+        if self._executor_manager_thread is not None and wait:
+            self._executor_manager_thread.join()
         # To reduce the risk of opening too many files, remove references to
         # objects that use file descriptors.
         self._executor_manager_thread = None
@@ -732,8 +747,6 @@ def shutdown(self, wait=True, *, cancel_futures=False):
             self._result_queue.close()
         self._result_queue = None
         self._processes = None
-
-        if self._executor_manager_thread_wakeup:
-            self._executor_manager_thread_wakeup = None
+        self._executor_manager_thread_wakeup = None
 
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/Misc/NEWS.d/next/Library/2020-04-28-18-25-27.bpo-39995.WmA3Gk.rst b/Misc/NEWS.d/next/Library/2020-04-28-18-25-27.bpo-39995.WmA3Gk.rst
new file mode 100644
index 0000000000000..24aff65736337
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-04-28-18-25-27.bpo-39995.WmA3Gk.rst
@@ -0,0 +1,2 @@
+Fix a race condition in concurrent.futures._ThreadWakeup: access to
+_ThreadWakeup is now protected with the shutdown lock.



More information about the Python-checkins mailing list