[Python-checkins] [2.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-9686)
Antoine Pitrou
webhook-mailer at python.org
Wed Oct 3 07:50:12 EDT 2018
https://github.com/python/cpython/commit/4a7dd30f5810e8861a3834159a222ab32d5c97d0
commit: 4a7dd30f5810e8861a3834159a222ab32d5c97d0
branch: 2.7
author: tzickel <tzickel at users.noreply.github.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2018-10-03T13:50:04+02:00
summary:
[2.7] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-9686)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
files:
A Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
M Lib/multiprocessing/pool.py
M Lib/test/test_multiprocessing.py
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index a47cd0f58a05..489c7d67cf34 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -162,7 +162,9 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
- args=(self, )
+ args=(self._cache, self._processes, self._pool, self.Process,
+ self._inqueue, self._outqueue, self._initializer,
+ self._initargs, self._maxtasksperchild, self._taskqueue)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
@@ -194,42 +196,56 @@ def __init__(self, processes=None, initializer=None, initargs=(),
exitpriority=15
)
- def _join_exited_workers(self):
+ @staticmethod
+ def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
- for i in reversed(range(len(self._pool))):
- worker = self._pool[i]
+ for i in reversed(range(len(pool))):
+ worker = pool[i]
if worker.exitcode is not None:
# worker exited
debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
- del self._pool[i]
+ del pool[i]
return cleaned
def _repopulate_pool(self):
+ return self._repopulate_pool_static(self._processes, self._pool,
+ self.Process, self._inqueue,
+ self._outqueue, self._initializer,
+ self._initargs,
+ self._maxtasksperchild)
+
+ @staticmethod
+ def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
- for i in range(self._processes - len(self._pool)):
- w = self.Process(target=worker,
- args=(self._inqueue, self._outqueue,
- self._initializer,
- self._initargs, self._maxtasksperchild)
- )
- self._pool.append(w)
+ for i in range(processes - len(pool)):
+ w = Process(target=worker,
+ args=(inqueue, outqueue,
+ initializer,
+ initargs, maxtasksperchild)
+ )
+ pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
debug('added worker')
- def _maintain_pool(self):
+ @staticmethod
+ def _maintain_pool(processes, pool, Process, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild):
"""Clean up any exited workers and start replacements for them.
"""
- if self._join_exited_workers():
- self._repopulate_pool()
+ if Pool._join_exited_workers(pool):
+ Pool._repopulate_pool_static(processes, pool, Process, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild)
def _setup_queues(self):
from .queues import SimpleQueue
@@ -319,16 +335,18 @@ def map_async(self, func, iterable, chunksize=None, callback=None):
return result
@staticmethod
- def _handle_workers(pool):
+ def _handle_workers(cache, processes, pool, Process, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild, taskqueue):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
- while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
- pool._maintain_pool()
+ while thread._state == RUN or (cache and thread._state != TERMINATE):
+ Pool._maintain_pool(processes, pool, Process, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild)
time.sleep(0.1)
# send sentinel to stop workers
- pool._taskqueue.put(None)
+ taskqueue.put(None)
debug('worker handler exiting')
@staticmethod
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index ff299feed894..d3192181e5ad 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1359,6 +1359,13 @@ def test_release_task_refs(self):
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)
+ def test_del_pool(self):
+ p = self.Pool(1)
+ wr = weakref.ref(p)
+ del p
+ gc.collect()
+ self.assertIsNone(wr())
+
def unpickleable_result():
return lambda: 42
diff --git a/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst b/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
new file mode 100644
index 000000000000..d1c5a7721019
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
@@ -0,0 +1 @@
+Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
More information about the Python-checkins
mailing list