[Python-checkins] bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)

Antoine Pitrou webhook-mailer at python.org
Fri Jan 5 05:15:58 EST 2018


https://github.com/python/cpython/commit/94459fd7dc25ce19096f2080eb7339497d319eb0
commit: 94459fd7dc25ce19096f2080eb7339497d319eb0
branch: master
author: Thomas Moreau <thomas.moreau.2010 at gmail.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2018-01-05T11:15:54+01:00
summary:

bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)

Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors.
This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.

files:
A Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst
M Lib/concurrent/futures/process.py
M Lib/multiprocessing/queues.py
M Lib/test/_test_multiprocessing.py
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 35af65d0bee..aaa5151e017 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -8,10 +8,10 @@
 |======================= In-process =====================|== Out-of-process ==|
 
 +----------+     +----------+       +--------+     +-----------+    +---------+
-|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
-|          |     +----------+       |        |     +-----------+    |         |
-|          |     | ...      |       |        |     | ...       |    |         |
-|          |     | 6        |       |        |     | 5, call() |    |         |
+|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
+|          |     +----------+       |        |     +-----------+    |  Pool   |
+|          |     | ...      |       |        |     | ...       |    +---------+
+|          |     | 6        |    => |        |  => | 5, call() | => |         |
 |          |     | 7        |       |        |     | ...       |    |         |
 | Process  |     | ...      |       | Local  |     +-----------+    | Process |
 |  Pool    |     +----------+       | Worker |                      |  #1..n  |
@@ -52,6 +52,7 @@
 from queue import Full
 import multiprocessing as mp
 from multiprocessing.connection import wait
+from multiprocessing.queues import Queue
 import threading
 import weakref
 from functools import partial
@@ -72,16 +73,31 @@
 # workers to exit when their work queues are empty and then waits until the
 # threads/processes finish.
 
-_threads_queues = weakref.WeakKeyDictionary()
+_threads_wakeups = weakref.WeakKeyDictionary()
 _global_shutdown = False
 
+
+class _ThreadWakeup:
+    __slot__ = ["_state"]
+
+    def __init__(self):
+        self._reader, self._writer = mp.Pipe(duplex=False)
+
+    def wakeup(self):
+        self._writer.send_bytes(b"")
+
+    def clear(self):
+        while self._reader.poll():
+            self._reader.recv_bytes()
+
+
 def _python_exit():
     global _global_shutdown
     _global_shutdown = True
-    items = list(_threads_queues.items())
-    for t, q in items:
-        q.put(None)
-    for t, q in items:
+    items = list(_threads_wakeups.items())
+    for _, thread_wakeup in items:
+        thread_wakeup.wakeup()
+    for t, _ in items:
         t.join()
 
 # Controls how many more calls than processes will be queued in the call queue.
@@ -90,6 +106,7 @@ def _python_exit():
 # (Futures in the call queue cannot be cancelled).
 EXTRA_QUEUED_CALLS = 1
 
+
 # Hack to embed stringification of remote traceback in local traceback
 
 class _RemoteTraceback(Exception):
@@ -132,6 +149,25 @@ def __init__(self, work_id, fn, args, kwargs):
         self.kwargs = kwargs
 
 
+class _SafeQueue(Queue):
+    """Safe Queue set exception to the future object linked to a job"""
+    def __init__(self, max_size=0, *, ctx, pending_work_items):
+        self.pending_work_items = pending_work_items
+        super().__init__(max_size, ctx=ctx)
+
+    def _on_queue_feeder_error(self, e, obj):
+        if isinstance(obj, _CallItem):
+            tb = traceback.format_exception(type(e), e, e.__traceback__)
+            e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
+            work_item = self.pending_work_items.pop(obj.work_id, None)
+            # work_item can be None if another process terminated. In this case,
+            # the queue_manager_thread fails all work_items with BrokenProcessPool
+            if work_item is not None:
+                work_item.future.set_exception(e)
+        else:
+            super()._on_queue_feeder_error(e, obj)
+
+
 def _get_chunks(*iterables, chunksize):
     """ Iterates over zip()ed iterables in chunks. """
     it = zip(*iterables)
@@ -152,6 +188,17 @@ def _process_chunk(fn, chunk):
     """
     return [fn(*args) for args in chunk]
 
+
+def _sendback_result(result_queue, work_id, result=None, exception=None):
+    """Safely send back the given result or exception"""
+    try:
+        result_queue.put(_ResultItem(work_id, result=result,
+                                     exception=exception))
+    except BaseException as e:
+        exc = _ExceptionWithTraceback(e, e.__traceback__)
+        result_queue.put(_ResultItem(work_id, exception=exc))
+
+
 def _process_worker(call_queue, result_queue, initializer, initargs):
     """Evaluates calls from call_queue and places the results in result_queue.
 
@@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
             r = call_item.fn(*call_item.args, **call_item.kwargs)
         except BaseException as e:
             exc = _ExceptionWithTraceback(e, e.__traceback__)
-            result_queue.put(_ResultItem(call_item.work_id, exception=exc))
+            _sendback_result(result_queue, call_item.work_id, exception=exc)
         else:
-            result_queue.put(_ResultItem(call_item.work_id,
-                                         result=r))
+            _sendback_result(result_queue, call_item.work_id, result=r)
 
         # Liberate the resource as soon as possible, to avoid holding onto
         # open files or shared memory that is not needed anymore
@@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items,
                 del pending_work_items[work_id]
                 continue
 
+
 def _queue_management_worker(executor_reference,
                              processes,
                              pending_work_items,
                              work_ids_queue,
                              call_queue,
-                             result_queue):
+                             result_queue,
+                             thread_wakeup):
     """Manages the communication between this process and the worker processes.
 
     This function is run in a local thread.
@@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference,
             derived from _WorkItems for processing by the process workers.
         result_queue: A ctx.SimpleQueue of _ResultItems generated by the
             process workers.
+        thread_wakeup: A _ThreadWakeup to allow waking up the
+            queue_manager_thread from the main Thread and avoid deadlocks
+            caused by permanently locked queues.
     """
     executor = None
 
@@ -261,10 +312,21 @@ def shutting_down():
                 or executor._shutdown_thread)
 
     def shutdown_worker():
-        # This is an upper bound
-        nb_children_alive = sum(p.is_alive() for p in processes.values())
-        for i in range(0, nb_children_alive):
-            call_queue.put_nowait(None)
+        # This is an upper bound on the number of children alive.
+        n_children_alive = sum(p.is_alive() for p in processes.values())
+        n_children_to_stop = n_children_alive
+        n_sentinels_sent = 0
+        # Send the right number of sentinels, to make sure all children are
+        # properly terminated.
+        while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
+            for i in range(n_children_to_stop - n_sentinels_sent):
+                try:
+                    call_queue.put_nowait(None)
+                    n_sentinels_sent += 1
+                except Full:
+                    break
+            n_children_alive = sum(p.is_alive() for p in processes.values())
+
         # Release the queue's resources as soon as possible.
         call_queue.close()
         # If .join() is not called on the created processes then
@@ -272,19 +334,37 @@ def shutdown_worker():
         for p in processes.values():
             p.join()
 
-    reader = result_queue._reader
+    result_reader = result_queue._reader
+    wakeup_reader = thread_wakeup._reader
+    readers = [result_reader, wakeup_reader]
 
     while True:
         _add_call_item_to_queue(pending_work_items,
                                 work_ids_queue,
                                 call_queue)
 
-        sentinels = [p.sentinel for p in processes.values()]
-        assert sentinels
-        ready = wait([reader] + sentinels)
-        if reader in ready:
-            result_item = reader.recv()
-        else:
+        # Wait for a result to be ready in the result_queue while checking
+        # that all worker processes are still running, or for a wake up
+        # signal send. The wake up signals come either from new tasks being
+        # submitted, from the executor being shutdown/gc-ed, or from the
+        # shutdown of the python interpreter.
+        worker_sentinels = [p.sentinel for p in processes.values()]
+        ready = wait(readers + worker_sentinels)
+
+        cause = None
+        is_broken = True
+        if result_reader in ready:
+            try:
+                result_item = result_reader.recv()
+                is_broken = False
+            except BaseException as e:
+                cause = traceback.format_exception(type(e), e, e.__traceback__)
+
+        elif wakeup_reader in ready:
+            is_broken = False
+            result_item = None
+        thread_wakeup.clear()
+        if is_broken:
             # Mark the process pool broken so that submits fail right now.
             executor = executor_reference()
             if executor is not None:
@@ -293,14 +373,15 @@ def shutdown_worker():
                                     'usable anymore')
                 executor._shutdown_thread = True
                 executor = None
+            bpe = BrokenProcessPool("A process in the process pool was "
+                                    "terminated abruptly while the future was "
+                                    "running or pending.")
+            if cause is not None:
+                bpe.__cause__ = _RemoteTraceback(
+                    f"\n'''\n{''.join(cause)}'''")
             # All futures in flight must be marked failed
             for work_id, work_item in pending_work_items.items():
-                work_item.future.set_exception(
-                    BrokenProcessPool(
-                        "A process in the process pool was "
-                        "terminated abruptly while the future was "
-                        "running or pending."
-                    ))
+                work_item.future.set_exception(bpe)
                 # Delete references to object. See issue16284
                 del work_item
             pending_work_items.clear()
@@ -329,6 +410,9 @@ def shutdown_worker():
                     work_item.future.set_result(result_item.result)
                 # Delete references to object. See issue16284
                 del work_item
+            # Delete reference to result_item
+            del result_item
+
         # Check whether we should start shutting down.
         executor = executor_reference()
         # No more work items can be added if:
@@ -348,8 +432,11 @@ def shutdown_worker():
                 pass
         executor = None
 
+
 _system_limits_checked = False
 _system_limited = None
+
+
 def _check_system_limits():
     global _system_limits_checked, _system_limited
     if _system_limits_checked:
@@ -369,7 +456,8 @@ def _check_system_limits():
         # minimum number of semaphores available
         # according to POSIX
         return
-    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+    _system_limited = ("system provides too few semaphores (%d"
+                       " available, 256 necessary)" % nsems_max)
     raise NotImplementedError(_system_limited)
 
 
@@ -415,6 +503,7 @@ def __init__(self, max_workers=None, mp_context=None,
                 raise ValueError("max_workers must be greater than 0")
 
             self._max_workers = max_workers
+
         if mp_context is None:
             mp_context = mp.get_context()
         self._mp_context = mp_context
@@ -424,34 +513,52 @@ def __init__(self, max_workers=None, mp_context=None,
         self._initializer = initializer
         self._initargs = initargs
 
+        # Management thread
+        self._queue_management_thread = None
+
+        # Map of pids to processes
+        self._processes = {}
+
+        # Shutdown is a two-step process.
+        self._shutdown_thread = False
+        self._shutdown_lock = threading.Lock()
+        self._broken = False
+        self._queue_count = 0
+        self._pending_work_items = {}
+
+        # Create communication channels for the executor
         # Make the call queue slightly larger than the number of processes to
         # prevent the worker processes from idling. But don't make it too big
         # because futures in the call queue cannot be cancelled.
         queue_size = self._max_workers + EXTRA_QUEUED_CALLS
-        self._call_queue = mp_context.Queue(queue_size)
+        self._call_queue = _SafeQueue(
+            max_size=queue_size, ctx=self._mp_context,
+            pending_work_items=self._pending_work_items)
         # Killed worker processes can produce spurious "broken pipe"
         # tracebacks in the queue's own worker thread. But we detect killed
         # processes anyway, so silence the tracebacks.
         self._call_queue._ignore_epipe = True
         self._result_queue = mp_context.SimpleQueue()
         self._work_ids = queue.Queue()
-        self._queue_management_thread = None
-        # Map of pids to processes
-        self._processes = {}
 
-        # Shutdown is a two-step process.
-        self._shutdown_thread = False
-        self._shutdown_lock = threading.Lock()
-        self._broken = False
-        self._queue_count = 0
-        self._pending_work_items = {}
+        # _ThreadWakeup is a communication channel used to interrupt the wait
+        # of the main loop of queue_manager_thread from another thread (e.g.
+        # when calling executor.submit or executor.shutdown). We do not use the
+        # _result_queue to send the wakeup signal to the queue_manager_thread
+        # as it could result in a deadlock if a worker process dies with the
+        # _result_queue write lock still acquired.
+        self._queue_management_thread_wakeup = _ThreadWakeup()
 
     def _start_queue_management_thread(self):
-        # When the executor gets lost, the weakref callback will wake up
-        # the queue management thread.
-        def weakref_cb(_, q=self._result_queue):
-            q.put(None)
         if self._queue_management_thread is None:
+            # When the executor gets garbarge collected, the weakref callback
+            # will wake up the queue management thread so that it can terminate
+            # if there is no pending work item.
+            def weakref_cb(_,
+                           thread_wakeup=self._queue_management_thread_wakeup):
+                mp.util.debug('Executor collected: triggering callback for'
+                              ' QueueManager wakeup')
+                thread_wakeup.wakeup()
             # Start the processes so that their sentinels are known.
             self._adjust_process_count()
             self._queue_management_thread = threading.Thread(
@@ -461,10 +568,13 @@ def weakref_cb(_, q=self._result_queue):
                       self._pending_work_items,
                       self._work_ids,
                       self._call_queue,
-                      self._result_queue))
+                      self._result_queue,
+                      self._queue_management_thread_wakeup),
+                name="QueueManagerThread")
             self._queue_management_thread.daemon = True
             self._queue_management_thread.start()
-            _threads_queues[self._queue_management_thread] = self._result_queue
+            _threads_wakeups[self._queue_management_thread] = \
+                self._queue_management_thread_wakeup
 
     def _adjust_process_count(self):
         for _ in range(len(self._processes), self._max_workers):
@@ -491,7 +601,7 @@ def submit(self, fn, *args, **kwargs):
             self._work_ids.put(self._queue_count)
             self._queue_count += 1
             # Wake up queue management thread
-            self._result_queue.put(None)
+            self._queue_management_thread_wakeup.wakeup()
 
             self._start_queue_management_thread()
             return f
@@ -531,7 +641,7 @@ def shutdown(self, wait=True):
             self._shutdown_thread = True
         if self._queue_management_thread:
             # Wake up queue management thread
-            self._result_queue.put(None)
+            self._queue_management_thread_wakeup.wakeup()
             if wait:
                 self._queue_management_thread.join()
         # To reduce the risk of opening too many files, remove references to
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 328efbd95fe..d66d37a5c3e 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -160,9 +160,10 @@ def _start_thread(self):
         self._thread = threading.Thread(
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send_bytes,
-                  self._wlock, self._writer.close, self._ignore_epipe),
+                  self._wlock, self._writer.close, self._ignore_epipe,
+                  self._on_queue_feeder_error),
             name='QueueFeederThread'
-            )
+        )
         self._thread.daemon = True
 
         debug('doing self._thread.start()')
@@ -201,7 +202,8 @@ def _finalize_close(buffer, notempty):
             notempty.notify()
 
     @staticmethod
-    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
+    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
+              onerror):
         debug('starting thread to feed data to pipe')
         nacquire = notempty.acquire
         nrelease = notempty.release
@@ -253,8 +255,17 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
                     info('error in queue thread: %s', e)
                     return
                 else:
-                    import traceback
-                    traceback.print_exc()
+                    onerror(e, obj)
+
+    @staticmethod
+    def _on_queue_feeder_error(e, obj):
+        """
+        Private API hook called when feeding data in the background thread
+        raises an exception.  For overriding by concurrent.futures.
+        """
+        import traceback
+        traceback.print_exc()
+
 
 _sentinel = object()
 
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 7575c5d3681..05166b91ba8 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1029,6 +1029,43 @@ def __reduce__(self):
             self.assertTrue(q.get(timeout=1.0))
             close_queue(q)
 
+    def test_queue_feeder_on_queue_feeder_error(self):
+        # bpo-30006: verify feeder handles exceptions using the
+        # _on_queue_feeder_error hook.
+        if self.TYPE != 'processes':
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+        class NotSerializable(object):
+            """Mock unserializable object"""
+            def __init__(self):
+                self.reduce_was_called = False
+                self.on_queue_feeder_error_was_called = False
+
+            def __reduce__(self):
+                self.reduce_was_called = True
+                raise AttributeError
+
+        class SafeQueue(multiprocessing.queues.Queue):
+            """Queue with overloaded _on_queue_feeder_error hook"""
+            @staticmethod
+            def _on_queue_feeder_error(e, obj):
+                if (isinstance(e, AttributeError) and
+                        isinstance(obj, NotSerializable)):
+                    obj.on_queue_feeder_error_was_called = True
+
+        not_serializable_obj = NotSerializable()
+        # The captured_stderr reduces the noise in the test report
+        with test.support.captured_stderr():
+            q = SafeQueue(ctx=multiprocessing.get_context())
+            q.put(not_serializable_obj)
+
+            # Verify that q is still functionning correctly
+            q.put(True)
+            self.assertTrue(q.get(timeout=1.0))
+
+        # Assert that the serialization and the hook have been called correctly
+        self.assertTrue(not_serializable_obj.reduce_was_called)
+        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
 #
 #
 #
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 76878992f9a..675cd7ae05e 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -18,6 +18,7 @@
 import time
 import unittest
 import weakref
+from pickle import PicklingError
 
 from concurrent import futures
 from concurrent.futures._base import (
@@ -394,16 +395,17 @@ def test_del_shutdown(self):
         queue_management_thread = executor._queue_management_thread
         processes = executor._processes
         call_queue = executor._call_queue
+        queue_management_thread = executor._queue_management_thread
         del executor
 
+        # Make sure that all the executor ressources were properly cleaned by
+        # the shutdown process
         queue_management_thread.join()
         for p in processes.values():
             p.join()
-        call_queue.close()
         call_queue.join_thread()
 
 
-
 create_executor_tests(ProcessPoolShutdownTest,
                       executor_mixins=(ProcessPoolForkMixin,
                                        ProcessPoolForkserverMixin,
@@ -784,6 +786,172 @@ def test_ressources_gced_in_workers(self):
                                        ProcessPoolForkserverMixin,
                                        ProcessPoolSpawnMixin))
 
+def hide_process_stderr():
+    import io
+    sys.stderr = io.StringIO()
+
+
+def _crash(delay=None):
+    """Induces a segfault."""
+    if delay:
+        time.sleep(delay)
+    import faulthandler
+    faulthandler.disable()
+    faulthandler._sigsegv()
+
+
+def _exit():
+    """Induces a sys exit with exitcode 1."""
+    sys.exit(1)
+
+
+def _raise_error(Err):
+    """Function that raises an Exception in process."""
+    hide_process_stderr()
+    raise Err()
+
+
+def _return_instance(cls):
+    """Function that returns a instance of cls."""
+    hide_process_stderr()
+    return cls()
+
+
+class CrashAtPickle(object):
+    """Bad object that triggers a segfault at pickling time."""
+    def __reduce__(self):
+        _crash()
+
+
+class CrashAtUnpickle(object):
+    """Bad object that triggers a segfault at unpickling time."""
+    def __reduce__(self):
+        return _crash, ()
+
+
+class ExitAtPickle(object):
+    """Bad object that triggers a process exit at pickling time."""
+    def __reduce__(self):
+        _exit()
+
+
+class ExitAtUnpickle(object):
+    """Bad object that triggers a process exit at unpickling time."""
+    def __reduce__(self):
+        return _exit, ()
+
+
+class ErrorAtPickle(object):
+    """Bad object that triggers an error at pickling time."""
+    def __reduce__(self):
+        from pickle import PicklingError
+        raise PicklingError("Error in pickle")
+
+
+class ErrorAtUnpickle(object):
+    """Bad object that triggers an error at unpickling time."""
+    def __reduce__(self):
+        from pickle import UnpicklingError
+        return _raise_error, (UnpicklingError, )
+
+
+class ExecutorDeadlockTest:
+    TIMEOUT = 15
+
+    @classmethod
+    def _sleep_id(cls, x, delay):
+        time.sleep(delay)
+        return x
+
+    def _fail_on_deadlock(self, executor):
+        # If we did not recover before TIMEOUT seconds, consider that the
+        # executor is in a deadlock state and forcefully clean all its
+        # composants.
+        import faulthandler
+        from tempfile import TemporaryFile
+        with TemporaryFile(mode="w+") as f:
+            faulthandler.dump_traceback(file=f)
+            f.seek(0)
+            tb = f.read()
+        for p in executor._processes.values():
+            p.terminate()
+        # This should be safe to call executor.shutdown here as all possible
+        # deadlocks should have been broken.
+        executor.shutdown(wait=True)
+        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
+        self.fail(f"Executor deadlock:\n\n{tb}")
+
+
+    def test_crash(self):
+        # extensive testing for deadlock caused by crashes in a pool.
+        self.executor.shutdown(wait=True)
+        crash_cases = [
+            # Check problem occuring while pickling a task in
+            # the task_handler thread
+            (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
+            # Check problem occuring while unpickling a task on workers
+            (id, (ExitAtUnpickle(),), BrokenProcessPool,
+             "exit at task unpickle"),
+            (id, (ErrorAtUnpickle(),), BrokenProcessPool,
+             "error at task unpickle"),
+            (id, (CrashAtUnpickle(),), BrokenProcessPool,
+             "crash at task unpickle"),
+            # Check problem occuring during func execution on workers
+            (_crash, (), BrokenProcessPool,
+             "crash during func execution on worker"),
+            (_exit, (), SystemExit,
+             "exit during func execution on worker"),
+            (_raise_error, (RuntimeError, ), RuntimeError,
+             "error during func execution on worker"),
+            # Check problem occuring while pickling a task result
+            # on workers
+            (_return_instance, (CrashAtPickle,), BrokenProcessPool,
+             "crash during result pickle on worker"),
+            (_return_instance, (ExitAtPickle,), SystemExit,
+             "exit during result pickle on worker"),
+            (_return_instance, (ErrorAtPickle,), PicklingError,
+             "error during result pickle on worker"),
+            # Check problem occuring while unpickling a task in
+            # the result_handler thread
+            (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
+             "error during result unpickle in result_handler"),
+            (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
+             "exit during result unpickle in result_handler")
+        ]
+        for func, args, error, name in crash_cases:
+            with self.subTest(name):
+                # The captured_stderr reduces the noise in the test report
+                with test.support.captured_stderr():
+                    executor = self.executor_type(
+                        max_workers=2, mp_context=get_context(self.ctx))
+                    res = executor.submit(func, *args)
+                    with self.assertRaises(error):
+                        try:
+                            res.result(timeout=self.TIMEOUT)
+                        except futures.TimeoutError:
+                            # If we did not recover before TIMEOUT seconds,
+                            # consider that the executor is in a deadlock state
+                            self._fail_on_deadlock(executor)
+                    executor.shutdown(wait=True)
+
+    def test_shutdown_deadlock(self):
+        # Test that the pool calling shutdown do not cause deadlock
+        # if a worker fails after the shutdown call.
+        self.executor.shutdown(wait=True)
+        with self.executor_type(max_workers=2,
+                                mp_context=get_context(self.ctx)) as executor:
+            self.executor = executor  # Allow clean up in fail_on_deadlock
+            f = executor.submit(_crash, delay=.1)
+            executor.shutdown(wait=True)
+            with self.assertRaises(BrokenProcessPool):
+                f.result()
+
+
+create_executor_tests(ExecutorDeadlockTest,
+                      executor_mixins=(ProcessPoolForkMixin,
+                                       ProcessPoolForkserverMixin,
+                                       ProcessPoolSpawnMixin))
+
 
 class FutureTests(BaseTestCase):
     def test_done_callback_with_result(self):
diff --git a/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst
new file mode 100644
index 00000000000..49cbbb3b920
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst
@@ -0,0 +1,4 @@
+Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when
+task arguments or results cause pickling or unpickling errors.
+This should make sure that calls to the :class:`ProcessPoolExecutor` API
+always eventually return.



More information about the Python-checkins mailing list