[Python-checkins] bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)

Kyle Stanley webhook-mailer at python.org
Sun Apr 19 10:01:04 EDT 2020


https://github.com/python/cpython/commit/1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe
commit: 1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe
branch: master
author: Kyle Stanley <aeros167 at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-04-19T07:00:59-07:00
summary:

bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453)



Roughly based on https://github.com/python/cpython/commit/904e34d4e6b6007986dcc585d5c553ee8ae06f95, but with a few substantial differences.

/cc @pitrou @brianquinlan

files:
A Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst
M Doc/whatsnew/3.9.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures.py

diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst
index 8147d8f185c7c..c4b49feed9fc1 100644
--- a/Doc/whatsnew/3.9.rst
+++ b/Doc/whatsnew/3.9.rst
@@ -206,6 +206,11 @@ and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
 compatibility with subinterpreters and predictability in their shutdown
 processes. (Contributed by Kyle Stanley in :issue:`39812`.)
 
+Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
+demand, only when there are no available idle workers to reuse. This optimizes
+startup overhead and reduces the amount of lost CPU time to idle workers.
+(Contributed by Kyle Stanley in :issue:`39207`.)
+
 curses
 ------
 
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 4c39500d675ff..36355ae8756db 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -318,6 +318,12 @@ def run(self):
                 # while waiting on new results.
                 del result_item
 
+                # attempt to increment idle process count
+                executor = self.executor_reference()
+                if executor is not None:
+                    executor._idle_worker_semaphore.release()
+                del executor
+
             if self.is_shutting_down():
                 self.flag_executor_shutting_down()
 
@@ -601,6 +607,7 @@ def __init__(self, max_workers=None, mp_context=None,
         # Shutdown is a two-step process.
         self._shutdown_thread = False
         self._shutdown_lock = threading.Lock()
+        self._idle_worker_semaphore = threading.Semaphore(0)
         self._broken = False
         self._queue_count = 0
         self._pending_work_items = {}
@@ -633,14 +640,18 @@ def __init__(self, max_workers=None, mp_context=None,
     def _start_executor_manager_thread(self):
         if self._executor_manager_thread is None:
             # Start the processes so that their sentinels are known.
-            self._adjust_process_count()
             self._executor_manager_thread = _ExecutorManagerThread(self)
             self._executor_manager_thread.start()
             _threads_wakeups[self._executor_manager_thread] = \
                 self._executor_manager_thread_wakeup
 
     def _adjust_process_count(self):
-        for _ in range(len(self._processes), self._max_workers):
+        # if there's an idle process, we don't need to spawn a new one.
+        if self._idle_worker_semaphore.acquire(blocking=False):
+            return
+
+        process_count = len(self._processes)
+        if process_count < self._max_workers:
             p = self._mp_context.Process(
                 target=_process_worker,
                 args=(self._call_queue,
@@ -669,6 +680,7 @@ def submit(self, fn, /, *args, **kwargs):
             # Wake up queue management thread
             self._executor_manager_thread_wakeup.wakeup()
 
+            self._adjust_process_count()
             self._start_executor_manager_thread()
             return f
     submit.__doc__ = _base.Executor.submit.__doc__
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 868415ab29916..a8c5bb6aa1a3a 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -486,10 +486,16 @@ def _prime_executor(self):
         pass
 
     def test_processes_terminate(self):
-        self.executor.submit(mul, 21, 2)
-        self.executor.submit(mul, 6, 7)
-        self.executor.submit(mul, 3, 14)
-        self.assertEqual(len(self.executor._processes), 5)
+        def acquire_lock(lock):
+            lock.acquire()
+
+        mp_context = get_context()
+        sem = mp_context.Semaphore(0)
+        for _ in range(3):
+            self.executor.submit(acquire_lock, sem)
+        self.assertEqual(len(self.executor._processes), 3)
+        for _ in range(3):
+            sem.release()
         processes = self.executor._processes
         self.executor.shutdown()
 
@@ -964,6 +970,36 @@ def test_ressources_gced_in_workers(self):
         mgr.shutdown()
         mgr.join()
 
+    def test_saturation(self):
+        executor = self.executor_type(4)
+        mp_context = get_context()
+        sem = mp_context.Semaphore(0)
+        job_count = 15 * executor._max_workers
+        try:
+            for _ in range(job_count):
+                executor.submit(sem.acquire)
+            self.assertEqual(len(executor._processes), executor._max_workers)
+            for _ in range(job_count):
+                sem.release()
+        finally:
+            executor.shutdown()
+
+    def test_idle_process_reuse_one(self):
+        executor = self.executor_type(4)
+        executor.submit(mul, 21, 2).result()
+        executor.submit(mul, 6, 7).result()
+        executor.submit(mul, 3, 14).result()
+        self.assertEqual(len(executor._processes), 1)
+        executor.shutdown()
+
+    def test_idle_process_reuse_multiple(self):
+        executor = self.executor_type(4)
+        executor.submit(mul, 12, 7).result()
+        executor.submit(mul, 33, 25)
+        executor.submit(mul, 25, 26).result()
+        executor.submit(mul, 18, 29)
+        self.assertLessEqual(len(executor._processes), 2)
+        executor.shutdown()
 
 create_executor_tests(ProcessPoolExecutorTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst b/Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst
new file mode 100644
index 0000000000000..3fa82771ded23
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-04-10-01-24-58.bpo-39207.2dE5Ox.rst
@@ -0,0 +1,4 @@
+Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
+demand, only when there are no available idle workers to reuse. This optimizes
+startup overhead and reduces the amount of lost CPU time to idle workers.
+Patch by Kyle Stanley.
\ No newline at end of file



More information about the Python-checkins mailing list