[Python-checkins] cpython (3.5): Fixes #27930: improved QueueListener behaviour.

vinay.sajip python-checkins at python.org
Wed Sep 7 20:25:16 EDT 2016


https://hg.python.org/cpython/rev/b2ea0ede3753
changeset:   103266:b2ea0ede3753
branch:      3.5
parent:      103258:8f02a1a6b647
user:        Vinay Sajip <vinay_sajip at yahoo.co.uk>
date:        Thu Sep 08 01:13:39 2016 +0100
summary:
  Fixes #27930: improved QueueListener behaviour.

files:
  Lib/logging/handlers.py  |  21 +-----
  Lib/test/test_logging.py |  90 ++++++++++++++++++++++++++-
  Misc/NEWS                |   3 +
  3 files changed, 93 insertions(+), 21 deletions(-)


diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py
--- a/Lib/logging/handlers.py
+++ b/Lib/logging/handlers.py
@@ -1,4 +1,4 @@
-# Copyright 2001-2015 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved.
 #
 # Permission to use, copy, modify, and distribute this software and its
 # documentation for any purpose and without fee is hereby granted,
@@ -18,7 +18,7 @@
 Additional handlers for the logging package for Python. The core package is
 based on PEP 282 and comments thereto in comp.lang.python.
 
-Copyright (C) 2001-2015 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved.
 
 To use, simply 'import logging.handlers' and log away!
 """
@@ -1366,7 +1366,6 @@
             """
             self.queue = queue
             self.handlers = handlers
-            self._stop = threading.Event()
             self._thread = None
             self.respect_handler_level = respect_handler_level
 
@@ -1387,7 +1386,7 @@
             LogRecords to process.
             """
             self._thread = t = threading.Thread(target=self._monitor)
-            t.setDaemon(True)
+            t.daemon = True
             t.start()
 
         def prepare(self , record):
@@ -1426,7 +1425,7 @@
             """
             q = self.queue
             has_task_done = hasattr(q, 'task_done')
-            while not self._stop.isSet():
+            while True:
                 try:
                     record = self.dequeue(True)
                     if record is self._sentinel:
@@ -1435,17 +1434,6 @@
                     if has_task_done:
                         q.task_done()
                 except queue.Empty:
-                    pass
-            # There might still be records in the queue.
-            while True:
-                try:
-                    record = self.dequeue(False)
-                    if record is self._sentinel:
-                        break
-                    self.handle(record)
-                    if has_task_done:
-                        q.task_done()
-                except queue.Empty:
                     break
 
         def enqueue_sentinel(self):
@@ -1466,7 +1454,6 @@
             Note that if you don't call this before your application exits, there
             may be some records still left on the queue, which won't be processed.
             """
-            self._stop.set()
             self.enqueue_sentinel()
             self._thread.join()
             self._thread = None
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -1,4 +1,4 @@
-# Copyright 2001-2014 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved.
 #
 # Permission to use, copy, modify, and distribute this software and its
 # documentation for any purpose and without fee is hereby granted,
@@ -16,7 +16,7 @@
 
 """Test harness for the logging module. Run all tests.
 
-Copyright (C) 2001-2014 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved.
 """
 
 import logging
@@ -3022,6 +3022,84 @@
         self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
         self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
 
+if hasattr(logging.handlers, 'QueueListener'):
+    import multiprocessing
+    from unittest.mock import patch
+
+    class QueueListenerTest(BaseTest):
+        """
+        Tests based on patch submitted for issue #27930. Ensure that
+        QueueListener handles all log messages.
+        """
+
+        repeat = 20
+
+        @staticmethod
+        def setup_and_log(log_queue, ident):
+            """
+            Creates a logger with a QueueHandler that logs to a queue read by a
+            QueueListener. Starts the listener, logs five messages, and stops
+            the listener.
+            """
+            logger = logging.getLogger('test_logger_with_id_%s' % ident)
+            logger.setLevel(logging.DEBUG)
+            handler = logging.handlers.QueueHandler(log_queue)
+            logger.addHandler(handler)
+            listener = logging.handlers.QueueListener(log_queue)
+            listener.start()
+
+            logger.info('one')
+            logger.info('two')
+            logger.info('three')
+            logger.info('four')
+            logger.info('five')
+
+            listener.stop()
+            logger.removeHandler(handler)
+            handler.close()
+
+        @patch.object(logging.handlers.QueueListener, 'handle')
+        def test_handle_called_with_queue_queue(self, mock_handle):
+            for i in range(self.repeat):
+                log_queue = queue.Queue()
+                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
+            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
+                             'correct number of handled log messages')
+
+        @patch.object(logging.handlers.QueueListener, 'handle')
+        def test_handle_called_with_mp_queue(self, mock_handle):
+            for i in range(self.repeat):
+                log_queue = multiprocessing.Queue()
+                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
+            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
+                             'correct number of handled log messages')
+
+        @staticmethod
+        def get_all_from_queue(log_queue):
+            try:
+                while True:
+                    yield log_queue.get_nowait()
+            except queue.Empty:
+                return []
+
+        def test_no_messages_in_queue_after_stop(self):
+            """
+            Five messages are logged then the QueueListener is stopped. This
+            test then gets everything off the queue. Failure of this test
+            indicates that messages were not registered on the queue until
+            _after_ the QueueListener stopped.
+            """
+            for i in range(self.repeat):
+                queue = multiprocessing.Queue()
+                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
+                # time.sleep(1)
+                items = list(self.get_all_from_queue(queue))
+                expected = [[], [logging.handlers.QueueListener._sentinel]]
+                self.assertIn(items, expected,
+                              'Found unexpected messages in queue: %s' % (
+                                    [m.msg if isinstance(m, logging.LogRecord)
+                                     else m for m in items]))
+
 
 ZERO = datetime.timedelta(0)
 
@@ -4166,7 +4244,7 @@
 # first and restore it at the end.
 @support.run_with_locale('LC_ALL', '')
 def test_main():
-    support.run_unittest(
+    tests = [
         BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
         HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
         DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
@@ -4177,7 +4255,11 @@
         RotatingFileHandlerTest,  LastResortTest, LogRecordTest,
         ExceptionTest, SysLogHandlerTest, HTTPHandlerTest,
         NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
-        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest)
+        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
+    ]
+    if hasattr(logging.handlers, 'QueueListener'):
+        tests.append(QueueListenerTest)
+    support.run_unittest(*tests)
 
 if __name__ == "__main__":
     test_main()
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -218,6 +218,9 @@
 - Issue #27392: Add loop.connect_accepted_socket().
   Patch by Jim Fulton.
 
+- Issue #27930: Improved behaviour of logging.handlers.QueueListener.
+  Thanks to Paulo Andrade and Petr Viktorin for the analysis and patch.
+
 IDLE
 ----
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list