[Python-checkins] bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)

pitrou webhook-mailer at python.org
Sat Nov 20 15:19:51 EST 2021


https://github.com/python/cpython/commit/fdc0e09c3316098b038996c428e88931f0a4fcdb
commit: fdc0e09c3316098b038996c428e88931f0a4fcdb
branch: main
author: Logan Jones <loganasherjones at gmail.com>
committer: pitrou <pitrou at free.fr>
date: 2021-11-20T21:19:41+01:00
summary:

bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)

Co-authored-by: Antoine Pitrou <antoine at python.org>

files:
A Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst
M Doc/library/concurrent.futures.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures.py

diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
index 897efc2f54442..b4213b451157e 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
 Calling :class:`Executor` or :class:`Future` methods from a callable submitted
 to a :class:`ProcessPoolExecutor` will result in deadlock.
 
-.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
+.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
 
    An :class:`Executor` subclass that executes calls asynchronously using a pool
    of at most *max_workers* processes.  If *max_workers* is ``None`` or not
@@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
    pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
    as well as any attempt to submit more jobs to the pool.
 
+   *max_tasks_per_child* is an optional argument that specifies the maximum
+   number of tasks a single process can execute before it will exit and be
+   replaced with a fresh worker process. The default *max_tasks_per_child* is
+   ``None`` which means worker processes will live as long as the pool.
+
    .. versionchanged:: 3.3
       When one of the worker processes terminates abruptly, a
       :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
@@ -264,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
 
       Added the *initializer* and *initargs* arguments.
 
+   .. versionchanged:: 3.11
+      The *max_tasks_per_child* argument was added to allow users to
+      control the lifetime of workers in the pool.
+
 
 .. _processpoolexecutor-example:
 
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9904db78c5b4c..19e93a608b276 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -141,10 +141,11 @@ def __init__(self, future, fn, args, kwargs):
         self.kwargs = kwargs
 
 class _ResultItem(object):
-    def __init__(self, work_id, exception=None, result=None):
+    def __init__(self, work_id, exception=None, result=None, exit_pid=None):
         self.work_id = work_id
         self.exception = exception
         self.result = result
+        self.exit_pid = exit_pid
 
 class _CallItem(object):
     def __init__(self, work_id, fn, args, kwargs):
@@ -201,17 +202,19 @@ def _process_chunk(fn, chunk):
     return [fn(*args) for args in chunk]
 
 
-def _sendback_result(result_queue, work_id, result=None, exception=None):
+def _sendback_result(result_queue, work_id, result=None, exception=None,
+                     exit_pid=None):
     """Safely send back the given result or exception"""
     try:
         result_queue.put(_ResultItem(work_id, result=result,
-                                     exception=exception))
+                                     exception=exception, exit_pid=exit_pid))
     except BaseException as e:
         exc = _ExceptionWithTraceback(e, e.__traceback__)
-        result_queue.put(_ResultItem(work_id, exception=exc))
+        result_queue.put(_ResultItem(work_id, exception=exc,
+                                     exit_pid=exit_pid))
 
 
-def _process_worker(call_queue, result_queue, initializer, initargs):
+def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
     """Evaluates calls from call_queue and places the results in result_queue.
 
     This worker is run in a separate process.
@@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
             # The parent will notice that the process stopped and
             # mark the pool broken
             return
+    num_tasks = 0
+    exit_pid = None
     while True:
         call_item = call_queue.get(block=True)
         if call_item is None:
             # Wake up queue management thread
             result_queue.put(os.getpid())
             return
+
+        if max_tasks is not None:
+            num_tasks += 1
+            if num_tasks >= max_tasks:
+                exit_pid = os.getpid()
+
         try:
             r = call_item.fn(*call_item.args, **call_item.kwargs)
         except BaseException as e:
             exc = _ExceptionWithTraceback(e, e.__traceback__)
-            _sendback_result(result_queue, call_item.work_id, exception=exc)
+            _sendback_result(result_queue, call_item.work_id, exception=exc,
+                             exit_pid=exit_pid)
         else:
-            _sendback_result(result_queue, call_item.work_id, result=r)
+            _sendback_result(result_queue, call_item.work_id, result=r,
+                             exit_pid=exit_pid)
             del r
 
         # Liberate the resource as soon as possible, to avoid holding onto
         # open files or shared memory that is not needed anymore
         del call_item
 
+        if exit_pid is not None:
+            return
+
 
 class _ExecutorManagerThread(threading.Thread):
     """Manages the communication between this process and the worker processes.
@@ -301,6 +317,10 @@ def weakref_cb(_,
         # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
         self.work_ids_queue = executor._work_ids
 
+        # Maximum number of tasks a worker process can execute before
+        # exiting safely
+        self.max_tasks_per_child = executor._max_tasks_per_child
+
         # A dict mapping work ids to _WorkItems e.g.
         #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
         self.pending_work_items = executor._pending_work_items
@@ -320,15 +340,23 @@ def run(self):
                 return
             if result_item is not None:
                 self.process_result_item(result_item)
+
+                process_exited = result_item.exit_pid is not None
+                if process_exited:
+                    p = self.processes.pop(result_item.exit_pid)
+                    p.join()
+
                 # Delete reference to result_item to avoid keeping references
                 # while waiting on new results.
                 del result_item
 
-                # attempt to increment idle process count
-                executor = self.executor_reference()
-                if executor is not None:
-                    executor._idle_worker_semaphore.release()
-                del executor
+                if executor := self.executor_reference():
+                    if process_exited:
+                        with self.shutdown_lock:
+                            executor._adjust_process_count()
+                    else:
+                        executor._idle_worker_semaphore.release()
+                    del executor
 
             if self.is_shutting_down():
                 self.flag_executor_shutting_down()
@@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor):
 
 class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None, mp_context=None,
-                 initializer=None, initargs=()):
+                 initializer=None, initargs=(), *, max_tasks_per_child=None):
         """Initializes a new ProcessPoolExecutor instance.
 
         Args:
@@ -589,6 +617,11 @@ def __init__(self, max_workers=None, mp_context=None,
                 object should provide SimpleQueue, Queue and Process.
             initializer: A callable used to initialize worker processes.
             initargs: A tuple of arguments to pass to the initializer.
+            max_tasks_per_child: The maximum number of tasks a worker process can
+                complete before it will exit and be replaced with a fresh
+                worker process, to enable unused resources to be freed. The
+                default value is None, which means worker process will live
+                as long as the executor will live.
         """
         _check_system_limits()
 
@@ -616,6 +649,13 @@ def __init__(self, max_workers=None, mp_context=None,
         self._initializer = initializer
         self._initargs = initargs
 
+        if max_tasks_per_child is not None:
+            if not isinstance(max_tasks_per_child, int):
+                raise TypeError("max_tasks_per_child must be an integer")
+            elif max_tasks_per_child <= 0:
+                raise ValueError("max_tasks_per_child must be >= 1")
+        self._max_tasks_per_child = max_tasks_per_child
+
         # Management thread
         self._executor_manager_thread = None
 
@@ -678,7 +718,8 @@ def _adjust_process_count(self):
                 args=(self._call_queue,
                       self._result_queue,
                       self._initializer,
-                      self._initargs))
+                      self._initargs,
+                      self._max_tasks_per_child))
             p.start()
             self._processes[p.pid] = p
 
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 84209ca2520b8..bbb6aa1eef81f 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -49,7 +49,6 @@ def create_future(state=PENDING, exception=None, result=None):
 
 INITIALIZER_STATUS = 'uninitialized'
 
-
 def mul(x, y):
     return x * y
 
@@ -1038,6 +1037,36 @@ def test_idle_process_reuse_multiple(self):
         self.assertLessEqual(len(executor._processes), 2)
         executor.shutdown()
 
+    def test_max_tasks_per_child(self):
+        executor = self.executor_type(1, max_tasks_per_child=3)
+        f1 = executor.submit(os.getpid)
+        original_pid = f1.result()
+        # The worker pid remains the same as the worker could be reused
+        f2 = executor.submit(os.getpid)
+        self.assertEqual(f2.result(), original_pid)
+        self.assertEqual(len(executor._processes), 1)
+        f3 = executor.submit(os.getpid)
+        self.assertEqual(f3.result(), original_pid)
+
+        # A new worker is spawned, with a statistically different pid,
+        # while the previous was reaped.
+        f4 = executor.submit(os.getpid)
+        new_pid = f4.result()
+        self.assertNotEqual(original_pid, new_pid)
+        self.assertEqual(len(executor._processes), 1)
+
+        executor.shutdown()
+
+    def test_max_tasks_early_shutdown(self):
+        executor = self.executor_type(3, max_tasks_per_child=1)
+        futures = []
+        for i in range(6):
+            futures.append(executor.submit(mul, i, i))
+        executor.shutdown()
+        for i, future in enumerate(futures):
+            self.assertEqual(future.result(), mul(i, i))
+
+
 create_executor_tests(ProcessPoolExecutorTest,
                       executor_mixins=(ProcessPoolForkMixin,
                                        ProcessPoolForkserverMixin,
diff --git a/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst
new file mode 100644
index 0000000000000..666b5f7d0a035
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst
@@ -0,0 +1,3 @@
+Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`.
+This allows users to specify the maximum number of tasks a single process
+should execute before the process needs to be restarted.



More information about the Python-checkins mailing list