[Python-checkins] cpython: test_logging coverage improvements.

vinay.sajip python-checkins at python.org
Tue Apr 26 19:43:12 CEST 2011


http://hg.python.org/cpython/rev/ababe8a73327
changeset:   69575:ababe8a73327
user:        Vinay Sajip <vinay_sajip at yahoo.co.uk>
date:        Tue Apr 26 18:43:05 2011 +0100
summary:
  test_logging coverage improvements.

files:
  Lib/logging/__init__.py  |   56 ++++++------
  Lib/test/test_logging.py |  113 +++++++++++++++++++++++++-
  2 files changed, 136 insertions(+), 33 deletions(-)


diff --git a/Lib/logging/__init__.py b/Lib/logging/__init__.py
--- a/Lib/logging/__init__.py
+++ b/Lib/logging/__init__.py
@@ -37,13 +37,13 @@
 
 try:
     import codecs
-except ImportError:
+except ImportError: #pragma: no cover
     codecs = None
 
 try:
     import _thread as thread
     import threading
-except ImportError:
+except ImportError: #pragma: no cover
     thread = None
 
 __author__  = "Vinay Sajip <vinay_sajip at red-dove.com>"
@@ -67,16 +67,16 @@
     _srcfile = __file__
 _srcfile = os.path.normcase(_srcfile)
 
-# next bit filched from 1.5.2's inspect.py
-def currentframe():
-    """Return the frame object for the caller's stack frame."""
-    try:
-        raise Exception
-    except:
-        return sys.exc_info()[2].tb_frame.f_back
 
-if hasattr(sys, '_getframe'): currentframe = lambda: sys._getframe(3)
-# done filching
+if hasattr(sys, '_getframe'):
+    currentframe = lambda: sys._getframe(3)
+else: #pragma: no cover
+    def currentframe():
+        """Return the frame object for the caller's stack frame."""
+        try:
+            raise Exception
+        except:
+            return sys.exc_info()[2].tb_frame.f_back
 
 # _srcfile is only used in conjunction with sys._getframe().
 # To provide compatibility with older versions of Python, set _srcfile
@@ -94,22 +94,22 @@
 #raiseExceptions is used to see if exceptions during handling should be
 #propagated
 #
-raiseExceptions = 1
+raiseExceptions = True
 
 #
 # If you don't want threading information in the log, set this to zero
 #
-logThreads = 1
+logThreads = True
 
 #
 # If you don't want multiprocessing information in the log, set this to zero
 #
-logMultiprocessing = 1
+logMultiprocessing = True
 
 #
 # If you don't want process information in the log, set this to zero
 #
-logProcesses = 1
+logProcesses = True
 
 #---------------------------------------------------------------------------
 #   Level related stuff
@@ -201,7 +201,7 @@
 #
 if thread:
     _lock = threading.RLock()
-else:
+else: #pragma: no cover
     _lock = None
 
 
@@ -281,10 +281,10 @@
         if logThreads and thread:
             self.thread = thread.get_ident()
             self.threadName = threading.current_thread().name
-        else:
+        else: # pragma: no cover
             self.thread = None
             self.threadName = None
-        if not logMultiprocessing:
+        if not logMultiprocessing: # pragma: no cover
             self.processName = None
         else:
             self.processName = 'MainProcess'
@@ -644,11 +644,11 @@
         yes. If deemed appropriate, the record may be modified in-place.
         """
         if self.nlen == 0:
-            return 1
+            return True
         elif self.name == record.name:
-            return 1
+            return True
         elif record.name.find(self.name, 0, self.nlen) != 0:
-            return 0
+            return False
         return (record.name[self.nlen] == ".")
 
 class Filterer(object):
@@ -775,7 +775,7 @@
         """
         if thread:
             self.lock = threading.RLock()
-        else:
+        else: #pragma: no cover
             self.lock = None
 
     def acquire(self):
@@ -939,7 +939,7 @@
             stream.write(msg)
             stream.write(self.terminator)
             self.flush()
-        except (KeyboardInterrupt, SystemExit):
+        except (KeyboardInterrupt, SystemExit): #pragma: no cover
             raise
         except:
             self.handleError(record)
@@ -948,13 +948,13 @@
     """
     A handler class which writes formatted logging records to disk files.
     """
-    def __init__(self, filename, mode='a', encoding=None, delay=0):
+    def __init__(self, filename, mode='a', encoding=None, delay=False):
         """
         Open the specified file and use it as the stream for logging.
         """
         #keep the absolute path, otherwise derived classes which use this
         #may come a cropper when the current directory changes
-        if codecs is None:
+        if codecs is None:  #pragma: no cover
             encoding = None
         self.baseFilename = os.path.abspath(filename)
         self.mode = mode
@@ -1352,9 +1352,9 @@
             #IronPython can use logging.
             try:
                 fn, lno, func, sinfo = self.findCaller(stack_info)
-            except ValueError:
+            except ValueError: # pragma: no cover
                 fn, lno, func = "(unknown file)", 0, "(unknown function)"
-        else:
+        else: # pragma: no cover
             fn, lno, func = "(unknown file)", 0, "(unknown function)"
         if exc_info:
             if not isinstance(exc_info, tuple):
@@ -1465,7 +1465,7 @@
         Is this logger enabled for level 'level'?
         """
         if self.manager.disable >= level:
-            return 0
+            return False
         return level >= self.getEffectiveLevel()
 
     def getChild(self, suffix):
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -494,6 +494,37 @@
             handler.removeFilter(garr)
 
 
+class HandlerTest(BaseTest):
+    def test_name(self):
+        h = logging.Handler()
+        h.name = 'generic'
+        self.assertEqual(h.name, 'generic')
+        h.name = 'anothergeneric'
+        self.assertEqual(h.name, 'anothergeneric')
+        self.assertRaises(NotImplementedError, h.emit, None)
+
+    def test_abc(self):
+        pass
+
+class BadStream(object):
+    def write(self, data):
+        raise RuntimeError('deliberate mistake')
+
+class TestStreamHandler(logging.StreamHandler):
+    def handleError(self, record):
+        self.error_record = record
+
+class StreamHandlerTest(BaseTest):
+    def test_error_handling(self):
+        h = TestStreamHandler(BadStream())
+        r = logging.makeLogRecord({})
+        old_raise = logging.raiseExceptions
+        try:
+            h.handle(r)
+            self.assertIs(h.error_record, r)
+        finally:
+            logging.raiseExceptions = old_raise
+
 class MemoryHandlerTest(BaseTest):
 
     """Tests for the MemoryHandler."""
@@ -2196,6 +2227,39 @@
         f = logging.Formatter('asctime', style='$')
         self.assertFalse(f.usesTime())
 
+    def test_invalid_style(self):
+        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
+
+    def test_time(self):
+        r = self.get_record()
+        r.created = 735375780.0 # 21 April 1993 08:03:00
+        r.msecs = 123
+        f = logging.Formatter('%(asctime)s %(message)s')
+        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
+        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
+
+class ExceptionTest(BaseTest):
+    def test_formatting(self):
+        r = self.root_logger
+        h = RecordingHandler()
+        r.addHandler(h)
+        try:
+            raise RuntimeError('deliberate mistake')
+        except:
+            logging.exception('failed', stack_info=True)
+        r.removeHandler(h)
+        h.close()
+        r = h.records[0]
+        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
+                                              'call last):\n'))
+        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
+                                            'deliberate mistake'))
+        self.assertTrue(r.stack_info.startswith('Stack (most recent '
+                                              'call last):\n'))
+        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
+                                            'stack_info=True)'))
+
+
 class LastResortTest(BaseTest):
     def test_last_resort(self):
         # Test the last resort handler
@@ -2407,6 +2471,23 @@
         logging.setLoggerClass(logging.Logger)
         self.assertEqual(logging.getLoggerClass(), logging.Logger)
 
+class LogRecordTest(BaseTest):
+    def test_str_rep(self):
+        r = logging.makeLogRecord({})
+        s = str(r)
+        self.assertTrue(s.startswith('<LogRecord: '))
+        self.assertTrue(s.endswith('>'))
+
+    def test_dict_arg(self):
+        h = RecordingHandler()
+        r = logging.getLogger()
+        r.addHandler(h)
+        d = {'less' : 'more' }
+        logging.warning('less is %(less)s', d)
+        self.assertIs(h.records[0].args, d)
+        self.assertEqual(h.records[0].message, 'less is more')
+        r.removeHandler(h)
+        h.close()
 
 class BasicConfigTest(unittest.TestCase):
 
@@ -2508,6 +2589,9 @@
 
         logging.basicConfig(level=57)
         self.assertEqual(logging.root.level, 57)
+        # Test that second call has no effect
+        logging.basicConfig(level=58)
+        self.assertEqual(logging.root.level, 57)
 
     def test_incompatible(self):
         assertRaises = self.assertRaises
@@ -2521,12 +2605,20 @@
                                                      handlers=handlers)
 
     def test_handlers(self):
-        handlers = [logging.StreamHandler(), logging.StreamHandler(sys.stdout)]
+        handlers = [
+            logging.StreamHandler(),
+            logging.StreamHandler(sys.stdout),
+            logging.StreamHandler(),
+        ]
+        f = logging.Formatter()
+        handlers[2].setFormatter(f)
         logging.basicConfig(handlers=handlers)
         self.assertIs(handlers[0], logging.root.handlers[0])
         self.assertIs(handlers[1], logging.root.handlers[1])
+        self.assertIs(handlers[2], logging.root.handlers[2])
         self.assertIsNotNone(handlers[0].formatter)
         self.assertIsNotNone(handlers[1].formatter)
+        self.assertIs(handlers[2].formatter, f)
         self.assertIs(handlers[0].formatter, handlers[1].formatter)
 
     def _test_log(self, method, level=None):
@@ -2758,6 +2850,17 @@
         self.rmfiles.append(filename)
 
 
+class FileHandlerTest(BaseFileTest):
+    def test_delay(self):
+        os.unlink(self.fn)
+        fh = logging.FileHandler(self.fn, delay=True)
+        self.assertIsNone(fh.stream)
+        self.assertFalse(os.path.exists(self.fn))
+        fh.handle(logging.makeLogRecord({}))
+        self.assertIsNotNone(fh.stream)
+        self.assertTrue(os.path.exists(self.fn))
+        fh.close()
+
 class RotatingFileHandlerTest(BaseFileTest):
     def next_rec(self):
         return logging.LogRecord('n', logging.DEBUG, 'p', 1,
@@ -2851,15 +2954,15 @@
 @run_with_locale('LC_ALL', '')
 def test_main():
     run_unittest(BuiltinLevelsTest, BasicFilterTest,
-                 CustomLevelsAndFiltersTest, MemoryHandlerTest,
+                 CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest,
                  ConfigFileTest, SocketHandlerTest, MemoryTest,
                  EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
-                 FormatterTest,
+                 FormatterTest, StreamHandlerTest,
                  LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
                  ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
                  LoggerAdapterTest, LoggerTest,
-                 RotatingFileHandlerTest,
-                 LastResortTest,
+                 FileHandlerTest, RotatingFileHandlerTest,
+                 LastResortTest, LogRecordTest, ExceptionTest,
                  TimedRotatingFileHandlerTest
                 )
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list