[Python-checkins] gh-90622: Do not spawn ProcessPool workers on demand via fork method. (GH-91598) (#92495)

gpshead webhook-mailer at python.org
Sun May 8 13:14:18 EDT 2022


https://github.com/python/cpython/commit/4270b7927de2260f5f1442bb90f788e9ad25ce9c
commit: 4270b7927de2260f5f1442bb90f788e9ad25ce9c
branch: 3.11
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: gpshead <greg at krypto.org>
date: 2022-05-08T10:14:14-07:00
summary:

gh-90622: Do not spawn ProcessPool workers on demand via fork method. (GH-91598) (#92495)

Do not spawn ProcessPool workers on demand when they spawn via fork.

This avoids potential deadlocks in the child processes due to forking from
a multithreaded process.
(cherry picked from commit ebb37fc3fdcb03db4e206db017eeef7aaffbae84)

Co-authored-by: Gregory P. Smith <greg at krypto.org>

files:
A Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 821034da21adc..7e2f5fa30e826 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -652,6 +652,10 @@ def __init__(self, max_workers=None, mp_context=None,
                 mp_context = mp.get_context()
         self._mp_context = mp_context
 
+        # https://github.com/python/cpython/issues/90622
+        self._safe_to_dynamically_spawn_children = (
+                self._mp_context.get_start_method(allow_none=False) != "fork")
+
         if initializer is not None and not callable(initializer):
             raise TypeError("initializer must be a callable")
         self._initializer = initializer
@@ -714,6 +718,8 @@ def __init__(self, max_workers=None, mp_context=None,
     def _start_executor_manager_thread(self):
         if self._executor_manager_thread is None:
             # Start the processes so that their sentinels are known.
+            if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
+                self._launch_processes()
             self._executor_manager_thread = _ExecutorManagerThread(self)
             self._executor_manager_thread.start()
             _threads_wakeups[self._executor_manager_thread] = \
@@ -726,15 +732,32 @@ def _adjust_process_count(self):
 
         process_count = len(self._processes)
         if process_count < self._max_workers:
-            p = self._mp_context.Process(
-                target=_process_worker,
-                args=(self._call_queue,
-                      self._result_queue,
-                      self._initializer,
-                      self._initargs,
-                      self._max_tasks_per_child))
-            p.start()
-            self._processes[p.pid] = p
+            # Assertion disabled as this codepath is also used to replace a
+            # worker that unexpectedly dies, even when using the 'fork' start
+            # method. That means there is still a potential deadlock bug. If a
+            # 'fork' mp_context worker dies, we'll be forking a new one when
+            # we know a thread is running (self._executor_manager_thread).
+            #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
+            self._spawn_process()
+
+    def _launch_processes(self):
+        # https://github.com/python/cpython/issues/90622
+        assert not self._executor_manager_thread, (
+                'Processes cannot be fork()ed after the thread has started, '
+                'deadlock in the child processes could result.')
+        for _ in range(len(self._processes), self._max_workers):
+            self._spawn_process()
+
+    def _spawn_process(self):
+        p = self._mp_context.Process(
+            target=_process_worker,
+            args=(self._call_queue,
+                  self._result_queue,
+                  self._initializer,
+                  self._initargs,
+                  self._max_tasks_per_child))
+        p.start()
+        self._processes[p.pid] = p
 
     def submit(self, fn, /, *args, **kwargs):
         with self._shutdown_lock:
@@ -755,7 +778,8 @@ def submit(self, fn, /, *args, **kwargs):
             # Wake up queue management thread
             self._executor_manager_thread_wakeup.wakeup()
 
-            self._adjust_process_count()
+            if self._safe_to_dynamically_spawn_children:
+                self._adjust_process_count()
             self._start_executor_manager_thread()
             return f
     submit.__doc__ = _base.Executor.submit.__doc__
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 4363e90b8bbab..6f3b4609232bb 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -497,10 +497,16 @@ def acquire_lock(lock):
             lock.acquire()
 
         mp_context = self.get_context()
+        if mp_context.get_start_method(allow_none=False) == "fork":
+            # fork pre-spawns, not on demand.
+            expected_num_processes = self.worker_count
+        else:
+            expected_num_processes = 3
+
         sem = mp_context.Semaphore(0)
         for _ in range(3):
             self.executor.submit(acquire_lock, sem)
-        self.assertEqual(len(self.executor._processes), 3)
+        self.assertEqual(len(self.executor._processes), expected_num_processes)
         for _ in range(3):
             sem.release()
         processes = self.executor._processes
@@ -1021,6 +1027,8 @@ def test_saturation(self):
     def test_idle_process_reuse_one(self):
         executor = self.executor
         assert executor._max_workers >= 4
+        if self.get_context().get_start_method(allow_none=False) == "fork":
+            raise unittest.SkipTest("Incompatible with the fork start method.")
         executor.submit(mul, 21, 2).result()
         executor.submit(mul, 6, 7).result()
         executor.submit(mul, 3, 14).result()
@@ -1029,6 +1037,8 @@ def test_idle_process_reuse_one(self):
     def test_idle_process_reuse_multiple(self):
         executor = self.executor
         assert executor._max_workers <= 5
+        if self.get_context().get_start_method(allow_none=False) == "fork":
+            raise unittest.SkipTest("Incompatible with the fork start method.")
         executor.submit(mul, 12, 7).result()
         executor.submit(mul, 33, 25)
         executor.submit(mul, 25, 26).result()
diff --git a/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst b/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst
new file mode 100644
index 0000000000000..5db0a1bbe721d
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst
@@ -0,0 +1,4 @@
+Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no
+longer spawned on demand (a feature added in 3.9) when the multiprocessing
+context start method is ``"fork"`` as that can lead to deadlocks in the
+child processes due to a fork happening while threads are running.



More information about the Python-checkins mailing list