[Python-checkins] cpython (merge default -> default): merge heads

benjamin.peterson python-checkins at python.org
Sat Jul 16 04:06:05 CEST 2011


http://hg.python.org/cpython/rev/db88cc5a4f6e
changeset:   71376:db88cc5a4f6e
parent:      71375:b23ad0a6cf87
parent:      71374:dfaa3a149a92
user:        Benjamin Peterson <benjamin at python.org>
date:        Fri Jul 15 21:10:44 2011 -0500
summary:
  merge heads

files:
  Lib/concurrent/futures/process.py   |  12 +++-
  Lib/multiprocessing/queues.py       |   9 ++-
  Lib/packaging/tests/__main__.py     |  15 +++--
  Lib/test/support.py                 |  19 ++++--
  Lib/test/test_concurrent_futures.py |   3 +-
  Lib/test/test_os.py                 |   1 +
  Lib/test/test_pydoc.py              |  47 +++++++++-------
  Lib/test/test_threaded_import.py    |  11 ++-
  Lib/test/threaded_import_hangers.py |  13 ++-
  Misc/NEWS                           |   6 ++
  10 files changed, 88 insertions(+), 48 deletions(-)


diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -205,6 +205,8 @@
         nb_children_alive = sum(p.is_alive() for p in processes.values())
         for i in range(0, nb_children_alive):
             call_queue.put_nowait(None)
+        # Release the queue's resources as soon as possible.
+        call_queue.close()
         # If .join() is not called on the created processes then
         # some multiprocessing.Queue methods may deadlock on Mac OS X.
         for p in processes.values():
@@ -239,14 +241,14 @@
             # locks may be in a dirty state and block forever.
             for p in processes.values():
                 p.terminate()
-            for p in processes.values():
-                p.join()
+            shutdown_worker()
             return
         if isinstance(result_item, int):
             # Clean shutdown of a worker using its PID
             # (avoids marking the executor broken)
             assert shutting_down()
-            del processes[result_item]
+            p = processes.pop(result_item)
+            p.join()
             if not processes:
                 shutdown_worker()
                 return
@@ -334,6 +336,10 @@
         # because futures in the call queue cannot be cancelled.
         self._call_queue = multiprocessing.Queue(self._max_workers +
                                                  EXTRA_QUEUED_CALLS)
+        # Killed worker processes can produce spurious "broken pipe"
+        # tracebacks in the queue's own worker thread. But we detect killed
+        # processes anyway, so silence the tracebacks.
+        self._call_queue._ignore_epipe = True
         self._result_queue = SimpleQueue()
         self._work_ids = queue.Queue()
         self._queue_management_thread = None
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -41,6 +41,7 @@
 import time
 import atexit
 import weakref
+import errno
 
 from queue import Empty, Full
 import _multiprocessing
@@ -67,6 +68,8 @@
         else:
             self._wlock = Lock()
         self._sem = BoundedSemaphore(maxsize)
+        # For use by concurrent.futures
+        self._ignore_epipe = False
 
         self._after_fork()
 
@@ -178,7 +181,7 @@
         self._thread = threading.Thread(
             target=Queue._feed,
             args=(self._buffer, self._notempty, self._send,
-                  self._wlock, self._writer.close),
+                  self._wlock, self._writer.close, self._ignore_epipe),
             name='QueueFeederThread'
             )
         self._thread.daemon = True
@@ -229,7 +232,7 @@
             notempty.release()
 
     @staticmethod
-    def _feed(buffer, notempty, send, writelock, close):
+    def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
         debug('starting thread to feed data to pipe')
         from .util import is_exiting
 
@@ -271,6 +274,8 @@
                 except IndexError:
                     pass
         except Exception as e:
+            if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+                return
             # Since this runs in a daemon thread the resources it uses
             # may be become unusable while the process is cleaning up.
             # We ignore errors which happen after the process has
diff --git a/Lib/packaging/tests/__main__.py b/Lib/packaging/tests/__main__.py
--- a/Lib/packaging/tests/__main__.py
+++ b/Lib/packaging/tests/__main__.py
@@ -5,15 +5,18 @@
 import os
 import sys
 import unittest
-from test.support import run_unittest, reap_children
+from test.support import run_unittest, reap_children, reap_threads
 
 
+ at reap_threads
 def test_main():
-    start_dir = os.path.dirname(__file__)
-    top_dir = os.path.dirname(os.path.dirname(start_dir))
-    test_loader = unittest.TestLoader()
-    run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir))
-    reap_children()
+    try:
+        start_dir = os.path.dirname(__file__)
+        top_dir = os.path.dirname(os.path.dirname(start_dir))
+        test_loader = unittest.TestLoader()
+        run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir))
+    finally:
+        reap_children()
 
 
 if __name__ == '__main__':
diff --git a/Lib/test/support.py b/Lib/test/support.py
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -24,9 +24,15 @@
 import logging.handlers
 
 try:
-    import _thread
+    import _thread, threading
 except ImportError:
     _thread = None
+    threading = None
+try:
+    import multiprocessing.process
+except ImportError:
+    multiprocessing = None
+
 
 try:
     import zlib
@@ -1358,19 +1364,20 @@
 
 def threading_setup():
     if _thread:
-        return _thread._count(),
+        return _thread._count(), threading._dangling.copy()
     else:
-        return 1,
+        return 1, ()
 
-def threading_cleanup(nb_threads):
+def threading_cleanup(*original_values):
     if not _thread:
         return
     _MAX_COUNT = 10
     for count in range(_MAX_COUNT):
-        n = _thread._count()
-        if n == nb_threads:
+        values = _thread._count(), threading._dangling
+        if values == original_values:
             break
         time.sleep(0.1)
+        gc_collect()
     # XXX print a warning in case of failure?
 
 def reap_threads(func):
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -634,7 +634,8 @@
                                   ThreadPoolAsCompletedTests,
                                   FutureTests,
                                   ProcessPoolShutdownTest,
-                                  ThreadPoolShutdownTest)
+                                  ThreadPoolShutdownTest,
+                                  )
     finally:
         test.support.reap_children()
 
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -1506,6 +1506,7 @@
                         raise
 
 
+ at support.reap_threads
 def test_main():
     support.run_unittest(
         FileTests,
diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py
--- a/Lib/test/test_pydoc.py
+++ b/Lib/test/test_pydoc.py
@@ -15,9 +15,12 @@
 from io import StringIO
 from collections import namedtuple
 from contextlib import contextmanager
-from test.support import TESTFN, forget, rmtree, EnvironmentVarGuard, \
-     reap_children, captured_output, captured_stdout, unlink
 
+from test.script_helper import assert_python_ok
+from test.support import (
+    TESTFN, forget, rmtree, EnvironmentVarGuard,
+    reap_children, reap_threads, captured_output, captured_stdout, unlink
+)
 from test import pydoc_mod
 
 try:
@@ -199,17 +202,14 @@
 # output pattern for module with bad imports
 badimport_pattern = "problem in %s - ImportError: No module named %r"
 
-def run_pydoc(module_name, *args):
+def run_pydoc(module_name, *args, **env):
     """
     Runs pydoc on the specified module. Returns the stripped
     output of pydoc.
     """
-    cmd = [sys.executable, pydoc.__file__, " ".join(args), module_name]
-    try:
-        output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
-        return output.strip()
-    finally:
-        reap_children()
+    args = args + (module_name,)
+    rc, out, err = assert_python_ok(pydoc.__file__, *args, **env)
+    return out.strip()
 
 def get_pydoc_html(module):
     "Returns pydoc generated output as html"
@@ -312,19 +312,20 @@
         def newdirinpath(dir):
             os.mkdir(dir)
             sys.path.insert(0, dir)
-            yield
-            sys.path.pop(0)
-            rmtree(dir)
+            try:
+                yield
+            finally:
+                sys.path.pop(0)
+                rmtree(dir)
 
-        with newdirinpath(TESTFN), EnvironmentVarGuard() as env:
-            env['PYTHONPATH'] = TESTFN
+        with newdirinpath(TESTFN):
             fullmodname = os.path.join(TESTFN, modname)
             sourcefn = fullmodname + os.extsep + "py"
             for importstring, expectedinmsg in testpairs:
                 with open(sourcefn, 'w') as f:
                     f.write("import {}\n".format(importstring))
                 try:
-                    result = run_pydoc(modname).decode("ascii")
+                    result = run_pydoc(modname, PYTHONPATH=TESTFN).decode("ascii")
                 finally:
                     forget(modname)
                 expected = badimport_pattern % (modname, expectedinmsg)
@@ -494,13 +495,17 @@
         self.assertEqual(sorted(pydoc.Helper.keywords),
                          sorted(keyword.kwlist))
 
+ at reap_threads
 def test_main():
-    test.support.run_unittest(PydocDocTest,
-                              TestDescriptions,
-                              PydocServerTest,
-                              PydocUrlHandlerTest,
-                              TestHelper,
-                              )
+    try:
+        test.support.run_unittest(PydocDocTest,
+                                  TestDescriptions,
+                                  PydocServerTest,
+                                  PydocUrlHandlerTest,
+                                  TestHelper,
+                                  )
+    finally:
+        reap_children()
 
 if __name__ == "__main__":
     test_main()
diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py
--- a/Lib/test/test_threaded_import.py
+++ b/Lib/test/test_threaded_import.py
@@ -11,8 +11,8 @@
 import time
 import shutil
 import unittest
-from test.support import verbose, import_module, run_unittest, TESTFN
-thread = import_module('_thread')
+from test.support import (
+    verbose, import_module, run_unittest, TESTFN, reap_threads)
 threading = import_module('threading')
 
 def task(N, done, done_tasks, errors):
@@ -62,7 +62,7 @@
     def __init__(self):
         self.numcalls = 0
         self.x = 0
-        self.lock = thread.allocate_lock()
+        self.lock = threading.Lock()
 
     def find_module(self, name, path=None):
         # Simulate some thread-unsafe behaviour. If calls to find_module()
@@ -113,7 +113,9 @@
             done_tasks = []
             done.clear()
             for i in range(N):
-                thread.start_new_thread(task, (N, done, done_tasks, errors,))
+                t = threading.Thread(target=task,
+                                     args=(N, done, done_tasks, errors,))
+                t.start()
             done.wait(60)
             self.assertFalse(errors)
             if verbose:
@@ -203,6 +205,7 @@
         self.assertEqual(set(results), {'a', 'b'})
 
 
+ at reap_threads
 def test_main():
     run_unittest(ThreadedImportTests)
 
diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py
--- a/Lib/test/threaded_import_hangers.py
+++ b/Lib/test/threaded_import_hangers.py
@@ -35,8 +35,11 @@
         ("os.path.abspath", os.path.abspath, ('.',)),
         ]:
 
-    t = Worker(func, args)
-    t.start()
-    t.join(TIMEOUT)
-    if t.is_alive():
-        errors.append("%s appeared to hang" % name)
+    try:
+        t = Worker(func, args)
+        t.start()
+        t.join(TIMEOUT)
+        if t.is_alive():
+            errors.append("%s appeared to hang" % name)
+    finally:
+        del t
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -231,6 +231,12 @@
 Library
 -------
 
+- Silence spurious "broken pipe" tracebacks when shutting down a
+  ProcessPoolExecutor.
+
+- Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor
+  by joining all queues and processes when shutdown() is called.
+
 - Issue #11603: Fix a crash when __str__ is rebound as __repr__.  Patch by
   Andreas Stührk.
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list