[Python-checkins] cpython (2.7): Issue #19425 -- a pickling error should not cause pool to hang.

richard.oudkerk python-checkins at python.org
Tue Oct 29 00:24:23 CET 2013


http://hg.python.org/cpython/rev/6aa42fc0c2f6
changeset:   86706:6aa42fc0c2f6
branch:      2.7
parent:      86701:cd95f1276360
user:        Richard Oudkerk <shibturn at gmail.com>
date:        Mon Oct 28 23:02:22 2013 +0000
summary:
  Issue #19425 -- a pickling error should not cause pool to hang.

files:
  Lib/multiprocessing/pool.py      |  14 +++++++++-----
  Lib/test/test_multiprocessing.py |  10 ++++++++++
  2 files changed, 19 insertions(+), 5 deletions(-)


diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -169,7 +169,8 @@
 
         self._task_handler = threading.Thread(
             target=Pool._handle_tasks,
-            args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+            args=(self._taskqueue, self._quick_put, self._outqueue,
+                  self._pool, self._cache)
             )
         self._task_handler.daemon = True
         self._task_handler._state = RUN
@@ -329,7 +330,7 @@
         debug('worker handler exiting')
 
     @staticmethod
-    def _handle_tasks(taskqueue, put, outqueue, pool):
+    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
         thread = threading.current_thread()
 
         for taskseq, set_length in iter(taskqueue.get, None):
@@ -340,9 +341,12 @@
                     break
                 try:
                     put(task)
-                except IOError:
-                    debug('could not put task on queue')
-                    break
+                except Exception as e:
+                    job, ind = task[:2]
+                    try:
+                        cache[job]._set(ind, (False, e))
+                    except KeyError:
+                        pass
             else:
                 if set_length:
                     debug('doing set_length()')
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1117,6 +1117,16 @@
         self.assertEqual(pmap(sqr, range(100), chunksize=20),
                          map(sqr, range(100)))
 
+    def test_map_unplicklable(self):
+        # Issue #19425 -- failure to pickle should not cause a hang
+        if self.TYPE == 'threads':
+            return
+        class A(object):
+            def __reduce__(self):
+                raise RuntimeError('cannot pickle')
+        with self.assertRaises(RuntimeError):
+            self.pool.map(sqr, [A()]*10)
+
     def test_map_chunksize(self):
         try:
             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list