[Python-checkins] Silence error output in test_concurrent_futures (bpo-21423) (#4347)

Antoine Pitrou webhook-mailer at python.org
Thu Nov 9 09:33:46 EST 2017


https://github.com/python/cpython/commit/0a2ff23fe6efb1653d655ac19d0a4e1629fd8d95
commit: 0a2ff23fe6efb1653d655ac19d0a4e1629fd8d95
branch: master
author: Antoine Pitrou <pitrou at free.fr>
committer: GitHub <noreply at github.com>
date: 2017-11-09T15:33:43+01:00
summary:

Silence error output in test_concurrent_futures (bpo-21423) (#4347)

* Silence error output in test_concurrent_futures (bpo-21423)

files:
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 296398f0d94..76878992f9a 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -9,7 +9,10 @@
 
 import contextlib
 import itertools
+import logging
+from logging.handlers import QueueHandler
 import os
+import queue
 import sys
 import threading
 import time
@@ -61,7 +64,12 @@ def init(x):
 def get_init_status():
     return INITIALIZER_STATUS
 
-def init_fail():
+def init_fail(log_queue=None):
+    if log_queue is not None:
+        logger = logging.getLogger('concurrent.futures')
+        logger.addHandler(QueueHandler(log_queue))
+        logger.setLevel('CRITICAL')
+        logger.propagate = False
     time.sleep(0.1)  # let some futures be scheduled
     raise ValueError('error in initializer')
 
@@ -101,18 +109,15 @@ def setUp(self):
         super().setUp()
 
         self.t1 = time.time()
-        try:
-            if hasattr(self, "ctx"):
-                self.executor = self.executor_type(
-                    max_workers=self.worker_count,
-                    mp_context=get_context(self.ctx),
-                    **self.executor_kwargs)
-            else:
-                self.executor = self.executor_type(
-                    max_workers=self.worker_count,
-                    **self.executor_kwargs)
-        except NotImplementedError as e:
-            self.skipTest(str(e))
+        if hasattr(self, "ctx"):
+            self.executor = self.executor_type(
+                max_workers=self.worker_count,
+                mp_context=self.get_context(),
+                **self.executor_kwargs)
+        else:
+            self.executor = self.executor_type(
+                max_workers=self.worker_count,
+                **self.executor_kwargs)
         self._prime_executor()
 
     def tearDown(self):
@@ -126,6 +131,9 @@ def tearDown(self):
 
         super().tearDown()
 
+    def get_context(self):
+        return get_context(self.ctx)
+
     def _prime_executor(self):
         # Make sure that the executor is ready to do work before running the
         # tests. This should reduce the probability of timeouts in the tests.
@@ -143,10 +151,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
     ctx = "fork"
 
-    def setUp(self):
+    def get_context(self):
         if sys.platform == "win32":
             self.skipTest("require unix system")
-        super().setUp()
+        return super().get_context()
 
 
 class ProcessPoolSpawnMixin(ExecutorMixin):
@@ -158,10 +166,10 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
     ctx = "forkserver"
 
-    def setUp(self):
+    def get_context(self):
         if sys.platform == "win32":
             self.skipTest("require unix system")
-        super().setUp()
+        return super().get_context()
 
 
 def create_executor_tests(mixin, bases=(BaseTestCase,),
@@ -206,7 +214,18 @@ class FailingInitializerMixin(ExecutorMixin):
     worker_count = 2
 
     def setUp(self):
-        self.executor_kwargs = dict(initializer=init_fail)
+        if hasattr(self, "ctx"):
+            # Pass a queue to redirect the child's logging output
+            self.mp_context = self.get_context()
+            self.log_queue = self.mp_context.Queue()
+            self.executor_kwargs = dict(initializer=init_fail,
+                                        initargs=(self.log_queue,))
+        else:
+            # In a thread pool, the child shares our logging setup
+            # (see _assert_logged())
+            self.mp_context = None
+            self.log_queue = None
+            self.executor_kwargs = dict(initializer=init_fail)
         super().setUp()
 
     def test_initializer(self):
@@ -234,14 +253,20 @@ def _prime_executor(self):
 
     @contextlib.contextmanager
     def _assert_logged(self, msg):
-        if self.executor_type is futures.ProcessPoolExecutor:
-            # No easy way to catch the child processes' stderr
+        if self.log_queue is not None:
             yield
+            output = []
+            try:
+                while True:
+                    output.append(self.log_queue.get_nowait().getMessage())
+            except queue.Empty:
+                pass
         else:
             with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
                 yield
-            self.assertTrue(any(msg in line for line in cm.output),
-                            cm.output)
+            output = cm.output
+        self.assertTrue(any(msg in line for line in output),
+                        output)
 
 
 create_executor_tests(InitializerMixin)



More information about the Python-checkins mailing list