[Python-checkins] cpython (3.4): Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now

serhiy.storchaka python-checkins at python.org
Fri Mar 13 07:32:25 CET 2015


https://hg.python.org/cpython/rev/525ccfcc55f7
changeset:   94971:525ccfcc55f7
branch:      3.4
parent:      94969:16e676fd77c0
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Fri Mar 13 08:25:26 2015 +0200
summary:
  Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator.  Patch by Alon Diamant and Davin
Potts.

files:
  Lib/multiprocessing/pool.py       |  37 +++++++++-----
  Lib/test/_test_multiprocessing.py |  46 +++++++++++++++++++
  Misc/ACKS                         |   2 +
  Misc/NEWS                         |   4 +
  4 files changed, 75 insertions(+), 14 deletions(-)


diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -374,25 +374,34 @@
         thread = threading.current_thread()
 
         for taskseq, set_length in iter(taskqueue.get, None):
+            task = None
             i = -1
-            for i, task in enumerate(taskseq):
-                if thread._state:
-                    util.debug('task handler found thread._state != RUN')
-                    break
-                try:
-                    put(task)
-                except Exception as e:
-                    job, ind = task[:2]
+            try:
+                for i, task in enumerate(taskseq):
+                    if thread._state:
+                        util.debug('task handler found thread._state != RUN')
+                        break
                     try:
-                        cache[job]._set(ind, (False, e))
-                    except KeyError:
-                        pass
-            else:
+                        put(task)
+                    except Exception as e:
+                        job, ind = task[:2]
+                        try:
+                            cache[job]._set(ind, (False, e))
+                        except KeyError:
+                            pass
+                else:
+                    if set_length:
+                        util.debug('doing set_length()')
+                        set_length(i+1)
+                    continue
+                break
+            except Exception as ex:
+                job, ind = task[:2] if task else (0, 0)
+                if job in cache:
+                    cache[job]._set(ind + 1, (False, ex))
                 if set_length:
                     util.debug('doing set_length()')
                     set_length(i+1)
-                continue
-            break
         else:
             util.debug('task handler got sentinel')
 
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1660,6 +1660,14 @@
 def mul(x, y):
     return x*y
 
+class SayWhenError(ValueError): pass
+
+def exception_throwing_generator(total, when):
+    for i in range(total):
+        if i == when:
+            raise SayWhenError("Somebody said when")
+        yield i
+
 class _TestPool(BaseTestCase):
 
     @classmethod
@@ -1758,6 +1766,25 @@
             self.assertEqual(next(it), i*i)
         self.assertRaises(StopIteration, it.__next__)
 
+    def test_imap_handle_iterable_exception(self):
+        if self.TYPE == 'manager':
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
+        for i in range(3):
+            self.assertEqual(next(it), i*i)
+        self.assertRaises(SayWhenError, it.__next__)
+
+        # SayWhenError seen at start of problematic chunk's results
+        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
+        for i in range(6):
+            self.assertEqual(next(it), i*i)
+        self.assertRaises(SayWhenError, it.__next__)
+        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
+        for i in range(4):
+            self.assertEqual(next(it), i*i)
+        self.assertRaises(SayWhenError, it.__next__)
+
     def test_imap_unordered(self):
         it = self.pool.imap_unordered(sqr, list(range(1000)))
         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
@@ -1765,6 +1792,25 @@
         it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
 
+    def test_imap_unordered_handle_iterable_exception(self):
+        if self.TYPE == 'manager':
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+        it = self.pool.imap_unordered(sqr,
+                                      exception_throwing_generator(10, 3),
+                                      1)
+        with self.assertRaises(SayWhenError):
+            # imap_unordered makes it difficult to anticipate the SayWhenError
+            for i in range(10):
+                self.assertEqual(next(it), i*i)
+
+        it = self.pool.imap_unordered(sqr,
+                                      exception_throwing_generator(20, 7),
+                                      2)
+        with self.assertRaises(SayWhenError):
+            for i in range(20):
+                self.assertEqual(next(it), i*i)
+
     def test_make_pool(self):
         self.assertRaises(ValueError, multiprocessing.Pool, -1)
         self.assertRaises(ValueError, multiprocessing.Pool, 0)
diff --git a/Misc/ACKS b/Misc/ACKS
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -332,6 +332,7 @@
 Caleb Deveraux
 Catherine Devlin
 Scott Dial
+Alon Diamant
 Toby Dickenson
 Mark Dickinson
 Jack Diederich
@@ -1081,6 +1082,7 @@
 Iustin Pop
 Claudiu Popa
 John Popplewell
+Davin Potts
 Guillaume Pratte
 Amrit Prem
 Paul Prescod
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -18,6 +18,10 @@
 Library
 -------
 
+- Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
+  handle exceptions raised by an iterator.  Patch by Alon Diamant and Davin
+  Potts.
+
 - Issue #22928: Disabled HTTP header injections in http.client.
   Original patch by Demian Brecht.
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list