[Python-checkins] cpython (2.7): Issue #19425 -- a pickling error should not cause pool to hang.
richard.oudkerk
python-checkins at python.org
Tue Oct 29 00:24:23 CET 2013
http://hg.python.org/cpython/rev/6aa42fc0c2f6
changeset: 86706:6aa42fc0c2f6
branch: 2.7
parent: 86701:cd95f1276360
user: Richard Oudkerk <shibturn at gmail.com>
date: Mon Oct 28 23:02:22 2013 +0000
summary:
Issue #19425 -- a pickling error should not cause pool to hang.
files:
Lib/multiprocessing/pool.py | 14 +++++++++-----
Lib/test/test_multiprocessing.py | 10 ++++++++++
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -169,7 +169,8 @@
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
- args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+ args=(self._taskqueue, self._quick_put, self._outqueue,
+ self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
@@ -329,7 +330,7 @@
debug('worker handler exiting')
@staticmethod
- def _handle_tasks(taskqueue, put, outqueue, pool):
+ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
@@ -340,9 +341,12 @@
break
try:
put(task)
- except IOError:
- debug('could not put task on queue')
- break
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
else:
if set_length:
debug('doing set_length()')
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1117,6 +1117,16 @@
self.assertEqual(pmap(sqr, range(100), chunksize=20),
map(sqr, range(100)))
+ def test_map_unplicklable(self):
+ # Issue #19425 -- failure to pickle should not cause a hang
+ if self.TYPE == 'threads':
+ return
+ class A(object):
+ def __reduce__(self):
+ raise RuntimeError('cannot pickle')
+ with self.assertRaises(RuntimeError):
+ self.pool.map(sqr, [A()]*10)
+
def test_map_chunksize(self):
try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
--
Repository URL: http://hg.python.org/cpython
More information about the Python-checkins
mailing list