[Python-checkins] cpython: Issue #23992: multiprocessing: make MapResult not fail-fast upon exception.

charles-francois.natali python-checkins at python.org
Wed Feb 10 17:58:47 EST 2016


https://hg.python.org/cpython/rev/1ba0deb52223
changeset:   100221:1ba0deb52223
user:        Charles-François Natali <cf.natali at gmail.com>
date:        Wed Feb 10 22:58:18 2016 +0000
summary:
  Issue #23992: multiprocessing: make MapResult not fail-fast upon exception.

files:
  Lib/multiprocessing/pool.py       |  20 +++++++++------
  Lib/test/_test_multiprocessing.py |  24 +++++++++++++++++++
  Misc/NEWS                         |   2 +
  3 files changed, 38 insertions(+), 8 deletions(-)


diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -638,22 +638,26 @@
             self._number_left = length//chunksize + bool(length % chunksize)
 
     def _set(self, i, success_result):
+        self._number_left -= 1
         success, result = success_result
-        if success:
+        if success and self._success:
             self._value[i*self._chunksize:(i+1)*self._chunksize] = result
-            self._number_left -= 1
             if self._number_left == 0:
                 if self._callback:
                     self._callback(self._value)
                 del self._cache[self._job]
                 self._event.set()
         else:
-            self._success = False
-            self._value = result
-            if self._error_callback:
-                self._error_callback(self._value)
-            del self._cache[self._job]
-            self._event.set()
+            if not success and self._success:
+                # only store first exception
+                self._success = False
+                self._value = result
+            if self._number_left == 0:
+                # only consider the result ready once all jobs are done
+                if self._error_callback:
+                    self._error_callback(self._value)
+                del self._cache[self._job]
+                self._event.set()
 
 #
 # Class whose instances are returned by `Pool.imap()`
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1660,6 +1660,10 @@
 def mul(x, y):
     return x*y
 
+def raise_large_valuerror(wait):
+    time.sleep(wait)
+    raise ValueError("x" * 1024**2)
+
 class SayWhenError(ValueError): pass
 
 def exception_throwing_generator(total, when):
@@ -1895,6 +1899,26 @@
             with self.assertRaises(RuntimeError):
                 p.apply(self._test_wrapped_exception)
 
+    def test_map_no_failfast(self):
+        # Issue #23992: the fail-fast behaviour when an exception is raised
+        # during map() would make Pool.join() deadlock, because a worker
+        # process would fill the result queue (after the result handler thread
+        # terminated, hence not draining it anymore).
+
+        t_start = time.time()
+
+        with self.assertRaises(ValueError):
+            with self.Pool(2) as p:
+                try:
+                    p.map(raise_large_valuerror, [0, 1])
+                finally:
+                    time.sleep(0.5)
+                    p.close()
+                    p.join()
+
+        # check that we indeed waited for all jobs
+        self.assertGreater(time.time() - t_start, 0.9)
+
 
 def raising():
     raise KeyError("key")
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -179,6 +179,8 @@
 Library
 -------
 
+- Issue #23992: multiprocessing: make MapResult not fail-fast upon exception.
+
 - Issue #26243: Support keyword arguments to zlib.compress().  Patch by Aviv
   Palivoda.
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list