[Python-checkins] cpython: Silence spurious "broken pipe" tracebacks when shutting down a

antoine.pitrou python-checkins at python.org
Sat Jul 16 01:53:50 CEST 2011


http://hg.python.org/cpython/rev/dfaa3a149a92
changeset:   71374:dfaa3a149a92
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Sat Jul 16 01:51:58 2011 +0200
summary:
  Silence spurious "broken pipe" tracebacks when shutting down a ProcessPoolExecutor.

files:
  Lib/concurrent/futures/process.py |  11 +++++++----
  Lib/multiprocessing/queues.py     |   9 +++++++--
  Misc/NEWS                         |   3 +++
  3 files changed, 17 insertions(+), 6 deletions(-)


diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -205,12 +205,12 @@
         nb_children_alive = sum(p.is_alive() for p in processes.values())
         for i in range(0, nb_children_alive):
             call_queue.put_nowait(None)
+        # Release the queue's resources as soon as possible.
+        call_queue.close()
         # If .join() is not called on the created processes then
         # some multiprocessing.Queue methods may deadlock on Mac OS X.
         for p in processes.values():
             p.join()
-        # Release resources held by the queue
-        call_queue.close()
 
     while True:
         _add_call_item_to_queue(pending_work_items,
@@ -241,8 +241,7 @@
             # locks may be in a dirty state and block forever.
             for p in processes.values():
                 p.terminate()
-            for p in processes.values():
-                p.join()
+            shutdown_worker()
             return
         if isinstance(result_item, int):
             # Clean shutdown of a worker using its PID
@@ -337,6 +336,10 @@
         # because futures in the call queue cannot be cancelled.
         self._call_queue = multiprocessing.Queue(self._max_workers +
                                                  EXTRA_QUEUED_CALLS)
+        # Killed worker processes can produce spurious "broken pipe"
+        # tracebacks in the queue's own worker thread. But we detect killed
+        # processes anyway, so silence the tracebacks.
+        self._call_queue._ignore_epipe = True
         self._result_queue = SimpleQueue()
         self._work_ids = queue.Queue()
         self._queue_management_thread = None
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -41,6 +41,7 @@
 import time
 import atexit
 import weakref
+import errno
 
 from queue import Empty, Full
 import _multiprocessing
@@ -67,6 +68,8 @@
         else:
             self._wlock = Lock()
         self._sem = BoundedSemaphore(maxsize)
+        # For use by concurrent.futures
+        self._ignore_epipe = False
 
         self._after_fork()
 
@@ -178,7 +181,7 @@
         self._thread = threading.Thread(
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send,
-                  self._wlock, self._writer.close),
+                  self._wlock, self._writer.close, self._ignore_epipe),
             name='QueueFeederThread'
             )
         self._thread.daemon = True
@@ -229,7 +232,7 @@
             notempty.release()
 
     @staticmethod
-    def _feed(buffer, notempty, send, writelock, close):
+    def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
         debug('starting thread to feed data to pipe')
         from .util import is_exiting
 
@@ -271,6 +274,8 @@
                 except IndexError:
                     pass
         except Exception as e:
+            if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+                return
             # Since this runs in a daemon thread the resources it uses
             # may be become unusable while the process is cleaning up.
             # We ignore errors which happen after the process has
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -228,6 +228,9 @@
 Library
 -------
 
+- Silence spurious "broken pipe" tracebacks when shutting down a
+  ProcessPoolExecutor.
+
 - Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor
   by joining all queues and processes when shutdown() is called.
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list