[Python-checkins] cpython (merge 3.2 -> default): Issue #10332: multiprocessing: fix a race condition when a Pool is closed

charles-francois.natali python-checkins at python.org
Mon Oct 24 18:45:29 CEST 2011


http://hg.python.org/cpython/rev/c2cdabc44665
changeset:   73101:c2cdabc44665
parent:      73098:9d8a14a550a3
parent:      73100:52c98a729a71
user:        Charles-François Natali <neologix at free.fr>
date:        Mon Oct 24 18:47:43 2011 +0200
summary:
  Issue #10332: multiprocessing: fix a race condition when a Pool is closed
before all tasks have completed.

files:
  Lib/multiprocessing/pool.py      |   6 +++++-
  Lib/test/test_multiprocessing.py |  14 ++++++++++++++
  Misc/NEWS                        |   3 +++
  3 files changed, 22 insertions(+), 1 deletions(-)


diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -321,7 +321,11 @@
 
     @staticmethod
     def _handle_workers(pool):
-        while pool._worker_handler._state == RUN and pool._state == RUN:
+        thread = threading.current_thread()
+
+        # Keep maintaining workers until the cache gets drained, unless the pool
+        # is terminated.
+        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
             pool._maintain_pool()
             time.sleep(0.1)
         # send sentinel to stop workers
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1266,6 +1266,20 @@
         p.close()
         p.join()
 
+    def test_pool_worker_lifetime_early_close(self):
+        # Issue #10332: closing a pool whose workers have limited lifetimes
+        # before all the tasks completed would make join() hang.
+        p = multiprocessing.Pool(3, maxtasksperchild=1)
+        results = []
+        for i in range(6):
+            results.append(p.apply_async(sqr, (i, 0.3)))
+        p.close()
+        p.join()
+        # check the results
+        for (j, res) in enumerate(results):
+            self.assertEqual(res.get(), sqr(j))
+
+
 #
 # Test that manager has expected number of shared objects left
 #
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -338,6 +338,9 @@
 Library
 -------
 
+- Issue #10332: multiprocessing: fix a race condition when a Pool is closed
+  before all tasks have completed.
+
 - Issue #13255: wrong docstrings in array module.
 
 - Issue #8540: Remove deprecated Context._clamp attribute in Decimal module.

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list