[Python-checkins] cpython: Closes issue #11557: Added Natalia Bidart's patch to improve test coverage.

vinay.sajip python-checkins at python.org
Tue Mar 29 18:20:50 CEST 2011


http://hg.python.org/cpython/rev/c7f7672b70a9
changeset:   69039:c7f7672b70a9
user:        Vinay Sajip <vinay_sajip at yahoo.co.uk>
date:        Tue Mar 29 17:20:34 2011 +0100
summary:
  Closes issue #11557: Added Natalia Bidart's patch to improve test coverage.

files:
  Lib/logging/__init__.py  |    8 +-
  Lib/test/test_logging.py |  537 +++++++++++++++++++++++++-
  2 files changed, 518 insertions(+), 27 deletions(-)


diff --git a/Lib/logging/__init__.py b/Lib/logging/__init__.py
--- a/Lib/logging/__init__.py
+++ b/Lib/logging/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2011 by Vinay Sajip. All Rights Reserved.
 #
 # Permission to use, copy, modify, and distribute this software and its
 # documentation for any purpose and without fee is hereby granted,
@@ -18,7 +18,7 @@
 Logging package for Python. Based on PEP 282 and comments thereto in
 comp.lang.python, and influenced by Apache's log4j system.
 
-Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved.
 
 To use, simply 'import logging' and log away!
 """
@@ -1826,10 +1826,10 @@
     package.
     """
     def handle(self, record):
-        pass
+        """Stub."""
 
     def emit(self, record):
-        pass
+        """Stub."""
 
     def createLock(self):
         self.lock = None
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -40,7 +40,7 @@
 import struct
 import sys
 import tempfile
-from test.support import captured_stdout, run_with_locale, run_unittest
+from test.support import captured_stdout, run_with_locale, run_unittest, patch
 import textwrap
 import unittest
 import warnings
@@ -1082,28 +1082,39 @@
     def test_warnings(self):
         with warnings.catch_warnings():
             logging.captureWarnings(True)
-            try:
-                warnings.filterwarnings("always", category=UserWarning)
-                file = io.StringIO()
-                h = logging.StreamHandler(file)
-                logger = logging.getLogger("py.warnings")
-                logger.addHandler(h)
-                warnings.warn("I'm warning you...")
-                logger.removeHandler(h)
-                s = file.getvalue()
-                h.close()
-                self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
-
-                #See if an explicit file uses the original implementation
-                file = io.StringIO()
-                warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
-                                        file, "Dummy line")
-                s = file.getvalue()
-                file.close()
-                self.assertEqual(s,
-                        "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
-            finally:
-                logging.captureWarnings(False)
+            self.addCleanup(lambda: logging.captureWarnings(False))
+            warnings.filterwarnings("always", category=UserWarning)
+            stream = io.StringIO()
+            h = logging.StreamHandler(stream)
+            logger = logging.getLogger("py.warnings")
+            logger.addHandler(h)
+            warnings.warn("I'm warning you...")
+            logger.removeHandler(h)
+            s = stream.getvalue()
+            h.close()
+            self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+
+            #See if an explicit file uses the original implementation
+            a_file = io.StringIO()
+            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
+                                 a_file, "Dummy line")
+            s = a_file.getvalue()
+            a_file.close()
+            self.assertEqual(s,
+                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
+
+    def test_warnings_no_handlers(self):
+        with warnings.catch_warnings():
+            logging.captureWarnings(True)
+            self.addCleanup(lambda: logging.captureWarnings(False))
+
+            # confirm our assumption: no loggers are set
+            logger = logging.getLogger("py.warnings")
+            assert logger.handlers == []
+
+            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
+            self.assertTrue(len(logger.handlers) == 1)
+            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
 
 
 def formatFunc(format, datefmt=None):
@@ -2007,6 +2018,11 @@
 
         self.assertEqual(logged, ['should appear in logged'])
 
+    def test_set_log_record_factory(self):
+        man = logging.Manager(None)
+        expected = object()
+        man.setLogRecordFactory(expected)
+        self.assertEqual(man.logRecordFactory, expected)
 
 class ChildLoggerTest(BaseTest):
     def test_child_loggers(self):
@@ -2198,6 +2214,479 @@
             logging.raiseExceptions = old_raise_exceptions
 
 
+class FakeHandler:
+
+    def __init__(self, identifier, called):
+        for method in ('acquire', 'flush', 'close', 'release'):
+            setattr(self, method, self.record_call(identifier, method, called))
+
+    def record_call(self, identifier, method_name, called):
+        def inner():
+            called.append('{} - {}'.format(identifier, method_name))
+        return inner
+
+
+class RecordingHandler(logging.NullHandler):
+
+    def __init__(self, *args, **kwargs):
+        super(RecordingHandler, self).__init__(*args, **kwargs)
+        self.records = []
+
+    def handle(self, record):
+        """Keep track of all the emitted records."""
+        self.records.append(record)
+
+
+class ShutdownTest(BaseTest):
+
+    """Tets suite for the shutdown method."""
+
+    def setUp(self):
+        super(ShutdownTest, self).setUp()
+        self.called = []
+
+        raise_exceptions = logging.raiseExceptions
+        self.addCleanup(lambda: setattr(logging, 'raiseExceptions', raise_exceptions))
+
+    def raise_error(self, error):
+        def inner():
+            raise error()
+        return inner
+
+    def test_no_failure(self):
+        # create some fake handlers
+        handler0 = FakeHandler(0, self.called)
+        handler1 = FakeHandler(1, self.called)
+        handler2 = FakeHandler(2, self.called)
+
+        # create live weakref to those handlers
+        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
+
+        logging.shutdown(handlerList=list(handlers))
+
+        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
+                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
+                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
+        self.assertEqual(expected, self.called)
+
+    def _test_with_failure_in_method(self, method, error):
+        handler = FakeHandler(0, self.called)
+        setattr(handler, method, self.raise_error(error))
+        handlers = [logging.weakref.ref(handler)]
+
+        logging.shutdown(handlerList=list(handlers))
+
+        self.assertEqual('0 - release', self.called[-1])
+
+    def test_with_ioerror_in_acquire(self):
+        self._test_with_failure_in_method('acquire', IOError)
+
+    def test_with_ioerror_in_flush(self):
+        self._test_with_failure_in_method('flush', IOError)
+
+    def test_with_ioerror_in_close(self):
+        self._test_with_failure_in_method('close', IOError)
+
+    def test_with_valueerror_in_acquire(self):
+        self._test_with_failure_in_method('acquire', ValueError)
+
+    def test_with_valueerror_in_flush(self):
+        self._test_with_failure_in_method('flush', ValueError)
+
+    def test_with_valueerror_in_close(self):
+        self._test_with_failure_in_method('close', ValueError)
+
+    def test_with_other_error_in_acquire_without_raise(self):
+        logging.raiseExceptions = False
+        self._test_with_failure_in_method('acquire', IndexError)
+
+    def test_with_other_error_in_flush_without_raise(self):
+        logging.raiseExceptions = False
+        self._test_with_failure_in_method('flush', IndexError)
+
+    def test_with_other_error_in_close_without_raise(self):
+        logging.raiseExceptions = False
+        self._test_with_failure_in_method('close', IndexError)
+
+    def test_with_other_error_in_acquire_with_raise(self):
+        logging.raiseExceptions = True
+        self.assertRaises(IndexError, self._test_with_failure_in_method,
+                          'acquire', IndexError)
+
+    def test_with_other_error_in_flush_with_raise(self):
+        logging.raiseExceptions = True
+        self.assertRaises(IndexError, self._test_with_failure_in_method,
+                          'flush', IndexError)
+
+    def test_with_other_error_in_close_with_raise(self):
+        logging.raiseExceptions = True
+        self.assertRaises(IndexError, self._test_with_failure_in_method,
+                          'close', IndexError)
+
+
+class ModuleLevelMiscTest(BaseTest):
+
+    """Tets suite for some module level methods."""
+
+    def test_disable(self):
+        old_disable = logging.root.manager.disable
+        # confirm our assumptions are correct
+        assert old_disable == 0
+        self.addCleanup(lambda: logging.disable(old_disable))
+
+        logging.disable(83)
+        self.assertEqual(logging.root.manager.disable, 83)
+
+    def _test_log(self, method, level=None):
+        called = []
+        patch(self, logging, 'basicConfig',
+              lambda *a, **kw: called.append(a, kw))
+
+        recording = RecordingHandler()
+        logging.root.addHandler(recording)
+
+        log_method = getattr(logging, method)
+        if level is not None:
+            log_method(level, "test me: %r", recording)
+        else:
+            log_method("test me: %r", recording)
+
+        self.assertEqual(len(recording.records), 1)
+        record = recording.records[0]
+        self.assertEqual(record.getMessage(), "test me: %r" % recording)
+
+        expected_level = level if level is not None else getattr(logging, method.upper())
+        self.assertEqual(record.levelno, expected_level)
+
+        # basicConfig was not called!
+        self.assertEqual(called, [])
+
+    def test_log(self):
+        self._test_log('log', logging.ERROR)
+
+    def test_debug(self):
+        self._test_log('debug')
+
+    def test_info(self):
+        self._test_log('info')
+
+    def test_warning(self):
+        self._test_log('warning')
+
+    def test_error(self):
+        self._test_log('error')
+
+    def test_critical(self):
+        self._test_log('critical')
+
+    def test_set_logger_class(self):
+        self.assertRaises(TypeError, logging.setLoggerClass, object)
+
+        class MyLogger(logging.Logger):
+            pass
+
+        logging.setLoggerClass(MyLogger)
+        self.assertEqual(logging.getLoggerClass(), MyLogger)
+
+        logging.setLoggerClass(logging.Logger)
+        self.assertEqual(logging.getLoggerClass(), logging.Logger)
+
+
+class BasicConfigTest(unittest.TestCase):
+
+    """Tets suite for logging.basicConfig."""
+
+    def setUp(self):
+        super(BasicConfigTest, self).setUp()
+        handlers = logging.root.handlers
+        self.addCleanup(lambda: setattr(logging.root, 'handlers', handlers))
+        logging.root.handlers = []
+
+    def tearDown(self):
+        logging.shutdown()
+        super(BasicConfigTest, self).tearDown()
+
+    def test_no_kwargs(self):
+        logging.basicConfig()
+
+        # handler defaults to a StreamHandler to sys.stderr
+        self.assertEqual(len(logging.root.handlers), 1)
+        handler = logging.root.handlers[0]
+        self.assertIsInstance(handler, logging.StreamHandler)
+        self.assertEqual(handler.stream, sys.stderr)
+
+        formatter = handler.formatter
+        # format defaults to logging.BASIC_FORMAT
+        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
+        # datefmt defaults to None
+        self.assertIsNone(formatter.datefmt)
+        # style defaults to %
+        self.assertIsInstance(formatter._style, logging.PercentStyle)
+
+        # level is not explicitely set
+        self.assertEqual(logging.root.level, logging.WARNING)
+
+    def test_filename(self):
+        logging.basicConfig(filename='test.log')
+
+        self.assertEqual(len(logging.root.handlers), 1)
+        handler = logging.root.handlers[0]
+        self.assertIsInstance(handler, logging.FileHandler)
+
+        expected = logging.FileHandler('test.log', 'a')
+        self.addCleanup(expected.close)
+        self.assertEqual(handler.stream.mode, expected.stream.mode)
+        self.assertEqual(handler.stream.name, expected.stream.name)
+
+    def test_filemode(self):
+        logging.basicConfig(filename='test.log', filemode='wb')
+
+        handler = logging.root.handlers[0]
+        expected = logging.FileHandler('test.log', 'wb')
+        self.addCleanup(expected.close)
+        self.assertEqual(handler.stream.mode, expected.stream.mode)
+
+    def test_stream(self):
+        stream = io.StringIO()
+        self.addCleanup(stream.close)
+        logging.basicConfig(stream=stream)
+
+        self.assertEqual(len(logging.root.handlers), 1)
+        handler = logging.root.handlers[0]
+        self.assertIsInstance(handler, logging.StreamHandler)
+        self.assertEqual(handler.stream, stream)
+
+    def test_format(self):
+        logging.basicConfig(format='foo')
+
+        formatter = logging.root.handlers[0].formatter
+        self.assertEqual(formatter._style._fmt, 'foo')
+
+    def test_datefmt(self):
+        logging.basicConfig(datefmt='bar')
+
+        formatter = logging.root.handlers[0].formatter
+        self.assertEqual(formatter.datefmt, 'bar')
+
+    def test_style(self):
+        logging.basicConfig(style='$')
+
+        formatter = logging.root.handlers[0].formatter
+        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
+
+    def test_level(self):
+        old_level = logging.root.level
+        self.addCleanup(lambda: logging.root.setLevel(old_level))
+
+        logging.basicConfig(level=57)
+        self.assertEqual(logging.root.level, 57)
+
+    def _test_log(self, method, level=None):
+        # logging.root has no handlers so basicConfig should be called
+        called = []
+
+        old_basic_config = logging.basicConfig
+        def my_basic_config(*a, **kw):
+            old_basic_config()
+            old_level = logging.root.level
+            logging.root.setLevel(100)  # avoid having messages in stderr
+            self.addCleanup(lambda: logging.root.setLevel(old_level))
+            called.append((a, kw))
+
+        patch(self, logging, 'basicConfig', my_basic_config)
+
+        log_method = getattr(logging, method)
+        if level is not None:
+            log_method(level, "test me")
+        else:
+            log_method("test me")
+
+        # basicConfig was called with no arguments
+        self.assertEqual(called, [((), {})])
+
+    def test_log(self):
+        self._test_log('log', logging.WARNING)
+
+    def test_debug(self):
+        self._test_log('debug')
+
+    def test_info(self):
+        self._test_log('info')
+
+    def test_warning(self):
+        self._test_log('warning')
+
+    def test_error(self):
+        self._test_log('error')
+
+    def test_critical(self):
+        self._test_log('critical')
+
+
+class LoggerAdapterTest(unittest.TestCase):
+
+    def setUp(self):
+        super(LoggerAdapterTest, self).setUp()
+        old_handler_list = logging._handlerList[:]
+
+        self.recording = RecordingHandler()
+        self.logger = logging.root
+        self.logger.addHandler(self.recording)
+        self.addCleanup(lambda: self.logger.removeHandler(self.recording))
+        self.addCleanup(self.recording.close)
+
+        def cleanup():
+            logging._handlerList[:] = old_handler_list
+
+        self.addCleanup(cleanup)
+        self.addCleanup(logging.shutdown)
+        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
+
+    def test_exception(self):
+        msg = 'testing exception: %r'
+        exc = None
+        try:
+            assert False
+        except AssertionError as e:
+            exc = e
+            self.adapter.exception(msg, self.recording)
+
+        self.assertEqual(len(self.recording.records), 1)
+        record = self.recording.records[0]
+        self.assertEqual(record.levelno, logging.ERROR)
+        self.assertEqual(record.msg, msg)
+        self.assertEqual(record.args, (self.recording,))
+        self.assertEqual(record.exc_info,
+                         (exc.__class__, exc, exc.__traceback__))
+
+    def test_critical(self):
+        msg = 'critical test! %r'
+        self.adapter.critical(msg, self.recording)
+
+        self.assertEqual(len(self.recording.records), 1)
+        record = self.recording.records[0]
+        self.assertEqual(record.levelno, logging.CRITICAL)
+        self.assertEqual(record.msg, msg)
+        self.assertEqual(record.args, (self.recording,))
+
+    def test_is_enabled_for(self):
+        old_disable = self.adapter.logger.manager.disable
+        self.adapter.logger.manager.disable = 33
+        self.addCleanup(lambda: setattr(self.adapter.logger.manager,
+                                        'disable', old_disable))
+        self.assertFalse(self.adapter.isEnabledFor(32))
+
+    def test_has_handlers(self):
+        self.assertTrue(self.adapter.hasHandlers())
+
+        for handler in self.logger.handlers:
+            self.logger.removeHandler(handler)
+        assert not self.logger.hasHandlers()
+
+        self.assertFalse(self.adapter.hasHandlers())
+
+
+class LoggerTest(BaseTest):
+
+    def setUp(self):
+        super(LoggerTest, self).setUp()
+        self.recording = RecordingHandler()
+        self.logger = logging.Logger(name='blah')
+        self.logger.addHandler(self.recording)
+        self.addCleanup(lambda: self.logger.removeHandler(self.recording))
+        self.addCleanup(self.recording.close)
+        self.addCleanup(logging.shutdown)
+
+    def test_set_invalid_level(self):
+        self.assertRaises(TypeError, self.logger.setLevel, object())
+
+    def test_exception(self):
+        msg = 'testing exception: %r'
+        exc = None
+        try:
+            assert False
+        except AssertionError as e:
+            exc = e
+            self.logger.exception(msg, self.recording)
+
+        self.assertEqual(len(self.recording.records), 1)
+        record = self.recording.records[0]
+        self.assertEqual(record.levelno, logging.ERROR)
+        self.assertEqual(record.msg, msg)
+        self.assertEqual(record.args, (self.recording,))
+        self.assertEqual(record.exc_info,
+                         (exc.__class__, exc, exc.__traceback__))
+
+    def test_log_invalid_level_with_raise(self):
+        old_raise = logging.raiseExceptions
+        self.addCleanup(lambda: setattr(logging, 'raiseExecptions', old_raise))
+
+        logging.raiseExceptions = True
+        self.assertRaises(TypeError, self.logger.log, '10', 'test message')
+
+    def test_log_invalid_level_no_raise(self):
+        old_raise = logging.raiseExceptions
+        self.addCleanup(lambda: setattr(logging, 'raiseExecptions', old_raise))
+
+        logging.raiseExceptions = False
+        self.logger.log('10', 'test message')  # no exception happens
+
+    def test_find_caller_with_stack_info(self):
+        called = []
+        patch(self, logging.traceback, 'print_stack',
+              lambda f, file: called.append(file.getvalue()))
+
+        self.logger.findCaller(stack_info=True)
+
+        self.assertEqual(len(called), 1)
+        self.assertEqual('Stack (most recent call last):\n', called[0])
+
+    def test_make_record_with_extra_overwrite(self):
+        name = 'my record'
+        level = 13
+        fn = lno = msg = args = exc_info = func = sinfo = None
+        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
+                                       exc_info, func, sinfo)
+
+        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
+            extra = {key: 'some value'}
+            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
+                              fn, lno, msg, args, exc_info,
+                              extra=extra, sinfo=sinfo)
+
+    def test_make_record_with_extra_no_overwrite(self):
+        name = 'my record'
+        level = 13
+        fn = lno = msg = args = exc_info = func = sinfo = None
+        extra = {'valid_key': 'some value'}
+        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
+                                        exc_info, extra=extra, sinfo=sinfo)
+        self.assertIn('valid_key', result.__dict__)
+
+    def test_has_handlers(self):
+        self.assertTrue(self.logger.hasHandlers())
+
+        for handler in self.logger.handlers:
+            self.logger.removeHandler(handler)
+        assert not self.logger.hasHandlers()
+
+        self.assertFalse(self.logger.hasHandlers())
+
+    def test_has_handlers_no_propagate(self):
+        child_logger = logging.getLogger('blah.child')
+        child_logger.propagate = False
+        assert child_logger.handlers == []
+
+        self.assertFalse(child_logger.hasHandlers())
+
+    def test_is_enabled_for(self):
+        old_disable = self.logger.manager.disable
+        self.logger.manager.disable = 23
+        self.addCleanup(lambda: setattr(self.logger.manager,
+                                        'disable', old_disable))
+        self.assertFalse(self.logger.isEnabledFor(22))
+
+
 class BaseFileTest(BaseTest):
     "Base class for handler tests that write log files"
 
@@ -2319,6 +2808,8 @@
                  EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
                  FormatterTest,
                  LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
+                 ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
+                 LoggerAdapterTest, LoggerTest,
                  RotatingFileHandlerTest,
                  LastResortTest,
                  TimedRotatingFileHandlerTest

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list