[Python-checkins] cpython: Added tests to improve coverage.

vinay.sajip python-checkins at python.org
Mon May 2 14:18:27 CEST 2011


http://hg.python.org/cpython/rev/347fa1c0b953
changeset:   69775:347fa1c0b953
parent:      69759:513f6dfd3173
user:        Vinay Sajip <vinay_sajip at yahoo.co.uk>
date:        Mon May 02 13:17:27 2011 +0100
summary:
  Added tests to improve coverage.

files:
  Lib/test/test_logging.py |  447 ++++++++++++++++++++------
  1 files changed, 339 insertions(+), 108 deletions(-)


diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -33,6 +33,7 @@
 import pickle
 import io
 import gc
+from http.server import HTTPServer, BaseHTTPRequestHandler
 import json
 import os
 import queue
@@ -40,7 +41,8 @@
 import select
 import smtpd
 import socket
-from socketserver import ThreadingTCPServer, StreamRequestHandler
+from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
+                          ThreadingTCPServer, StreamRequestHandler)
 import struct
 import sys
 import tempfile
@@ -49,6 +51,7 @@
 import textwrap
 import time
 import unittest
+from urllib.parse import urlparse, parse_qs
 import warnings
 import weakref
 try:
@@ -596,9 +599,8 @@
             logging.raiseExceptions = old_raise
             sys.stderr = old_stderr
 
-
-TEST_SMTP_PORT = 9025
-
+# -- The following section could be moved into a server_helper.py module
+# -- if it proves to be of wider utility than just test_logging
 
 class TestSMTPChannel(smtpd.SMTPChannel):
     def __init__(self, server, conn, addr, sockmap):
@@ -642,6 +644,7 @@
             # try to re-use a server port if possible
             self.set_reuse_addr()
             self.bind(addr)
+            self.port = sock.getsockname()[1]
             self.listen(5)
         except:
             self.close()
@@ -666,23 +669,162 @@
     def serve_forever(self, poll_interval):
         asyncore.loop(poll_interval, map=self.sockmap)
 
-    def stop(self):
+    def stop(self, timeout=None):
         self.close()
-        self._thread.join()
+        self._thread.join(timeout)
         self._thread = None
 
+class ControlMixin(object):
+    """
+    This mixin is used to start a server on a separate thread, and
+    shut it down programmatically. Request handling is simplified - instead
+    of needing to derive a suitable RequestHandler subclass, you just
+    provide a callable which will be passed each received request to be
+    processed.
+
+    :param handler: A handler callable which will be called with a
+                    single parameter - the request - in order to
+                    process the request. This handler is called on the
+                    server thread, effectively meaning that requests are
+                    processed serially. While not quite Web scale ;-),
+                    this should be fine for testing applications.
+    :param poll_interval: The polling interval in seconds.
+    """
+    def __init__(self, handler, poll_interval):
+        self._thread = None
+        self.poll_interval = poll_interval
+        self._handler = handler
+        self.ready = threading.Event()
+
+    def start(self):
+        """
+        Create a daemon thread to run the server, and start it.
+        """
+        self._thread = t = threading.Thread(target=self.serve_forever,
+                                            args=(self.poll_interval,))
+        t.setDaemon(True)
+        t.start()
+
+    def serve_forever(self, poll_interval):
+        self.ready.set()
+        super(ControlMixin, self).serve_forever(poll_interval)
+
+    def stop(self, timeout=None):
+        """
+        Tell the server thread to stop, and wait for it to do so.
+        """
+        self.shutdown()
+        if self._thread is not None:
+            self._thread.join(timeout)
+            self._thread = None
+        self.server_close()
+        self.ready.clear()
+
+class TestHTTPServer(ControlMixin, HTTPServer):
+    """
+    An HTTP server which is controllable using :class:`ControlMixin`.
+
+    :param addr: A tuple with the IP address and port to listen on.
+    :param handler: A handler callable which will be called with a
+                    single parameter - the request - in order to
+                    process the request.
+    :param poll_interval: The polling interval in seconds.
+    """
+    def __init__(self, addr, handler, poll_interval=0.5, log=False):
+        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
+            def __getattr__(self, name, default=None):
+                if name.startswith('do_'):
+                    return self.process_request
+                raise AttributeError(name)
+
+            def process_request(self):
+                self.server._handler(self)
+
+            def log_message(self, format, *args):
+                if log:
+                    super(DelegatingHTTPRequestHandler,
+                          self).log_message(format, *args)
+        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
+        ControlMixin.__init__(self, handler, poll_interval)
+
+class TestTCPServer(ControlMixin, ThreadingTCPServer):
+    """
+    A TCP server which is controllable using :class:`ControlMixin`.
+
+    :param addr: A tuple with the IP address and port to listen on.
+    :param handler: A handler callable which will be called with a single
+                    parameter - the request - in order to process the request.
+    :param poll_interval: The polling interval in seconds.
+    :bind_and_activate: If True (the default), binds the server and starts it
+                        listening. If False, you need to call
+                        :meth:`server_bind` and :meth:`server_activate` at
+                        some later time before calling :meth:`start`, so that
+                        the server will set up the socket and listen on it.
+    """
+
+    allow_reuse_address = True
+
+    def __init__(self, addr, handler, poll_interval=0.5,
+                 bind_and_activate=True):
+        class DelegatingTCPRequestHandler(StreamRequestHandler):
+
+            def handle(self):
+                self.server._handler(self)
+        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
+                                    bind_and_activate)
+        ControlMixin.__init__(self, handler, poll_interval)
+
+    def server_bind(self):
+        super(TestTCPServer, self).server_bind()
+        self.port = self.socket.getsockname()[1]
+
+class TestUDPServer(ControlMixin, ThreadingUDPServer):
+    """
+    A UDP server which is controllable using :class:`ControlMixin`.
+
+    :param addr: A tuple with the IP address and port to listen on.
+    :param handler: A handler callable which will be called with a
+                    single parameter - the request - in order to
+                    process the request.
+    :param poll_interval: The polling interval for shutdown requests,
+                          in seconds.
+    :bind_and_activate: If True (the default), binds the server and
+                        starts it listening. If False, you need to
+                        call :meth:`server_bind` and
+                        :meth:`server_activate` at some later time
+                        before calling :meth:`start`, so that the server will
+                        set up the socket and listen on it.
+    """
+    def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True):
+        class DelegatingUDPRequestHandler(DatagramRequestHandler):
+
+            def handle(self):
+                self.server._handler(self)
+        ThreadingUDPServer.__init__(self, addr, DelegatingUDPRequestHandler,
+                                    bind_and_activate)
+        ControlMixin.__init__(self, handler, poll_interval)
+
+    def server_bind(self):
+        super(TestUDPServer, self).server_bind()
+        self.port = self.socket.getsockname()[1]
+
+
+# - end of server_helper section
 
 class SMTPHandlerTest(BaseTest):
     def test_basic(self):
-        addr = ('localhost', TEST_SMTP_PORT)
+        sockmap = {}
+        server = TestSMTPServer(('localhost', 0), self.process_message, 0.001,
+                                sockmap)
+        server.start()
+        addr = ('localhost', server.port)
         h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
         self.assertEqual(h.toaddrs, ['you'])
         self.messages = []
-        sockmap = {}
-        server = TestSMTPServer(addr, self.process_message, 0.001, sockmap)
-        server.start()
         r = logging.makeLogRecord({'msg': 'Hello'})
+        self.handled = threading.Event()
         h.handle(r)
+        self.handled.wait()
         server.stop()
         self.assertEqual(len(self.messages), 1)
         peer, mailfrom, rcpttos, data = self.messages[0]
@@ -694,7 +836,7 @@
 
     def process_message(self, *args):
         self.messages.append(args)
-
+        self.handled.set()
 
 class MemoryHandlerTest(BaseTest):
 
@@ -1068,68 +1210,6 @@
             # Original logger output is empty.
             self.assert_log_lines([])
 
-class LogRecordStreamHandler(StreamRequestHandler):
-
-    """Handler for a streaming logging request. It saves the log message in the
-    TCP server's 'log_output' attribute."""
-
-    TCP_LOG_END = "!!!END!!!"
-
-    def handle(self):
-        """Handle multiple requests - each expected to be of 4-byte length,
-        followed by the LogRecord in pickle format. Logs the record
-        according to whatever policy is configured locally."""
-        while True:
-            chunk = self.connection.recv(4)
-            if len(chunk) < 4:
-                break
-            slen = struct.unpack(">L", chunk)[0]
-            chunk = self.connection.recv(slen)
-            while len(chunk) < slen:
-                chunk = chunk + self.connection.recv(slen - len(chunk))
-            obj = self.unpickle(chunk)
-            record = logging.makeLogRecord(obj)
-            self.handle_log_record(record)
-
-    def unpickle(self, data):
-        return pickle.loads(data)
-
-    def handle_log_record(self, record):
-        # If the end-of-messages sentinel is seen, tell the server to
-        #  terminate.
-        if self.TCP_LOG_END in record.msg:
-            self.server.abort = 1
-            return
-        self.server.log_output += record.msg + "\n"
-
-
-class LogRecordSocketReceiver(ThreadingTCPServer):
-
-    """A simple-minded TCP socket-based logging receiver suitable for test
-    purposes."""
-
-    allow_reuse_address = 1
-    log_output = ""
-
-    def __init__(self, host='localhost',
-                             port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
-                     handler=LogRecordStreamHandler):
-        ThreadingTCPServer.__init__(self, (host, port), handler)
-        self.abort = False
-        self.timeout = 0.1
-        self.finished = threading.Event()
-
-    def serve_until_stopped(self):
-        while not self.abort:
-            rd, wr, ex = select.select([self.socket.fileno()], [], [],
-                                       self.timeout)
-            if rd:
-                self.handle_request()
-        # Notify the main thread that we're about to exit
-        self.finished.set()
-        # close the listen socket
-        self.server_close()
-
 
 @unittest.skipUnless(threading, 'Threading required for this test.')
 class SocketHandlerTest(BaseTest):
@@ -1140,51 +1220,54 @@
         """Set up a TCP server to receive log messages, and a SocketHandler
         pointing to that server's address and port."""
         BaseTest.setUp(self)
-        self.tcpserver = LogRecordSocketReceiver(port=0)
-        self.port = self.tcpserver.socket.getsockname()[1]
-        self.threads = [
-                threading.Thread(target=self.tcpserver.serve_until_stopped)]
-        for thread in self.threads:
-            thread.start()
-
-        self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
-        self.sock_hdlr.setFormatter(self.root_formatter)
+        addr = ('localhost', 0)
+        self.server = server = TestTCPServer(addr, self.handle_socket,
+                                                0.01)
+        server.start()
+        server.ready.wait()
+        self.sock_hdlr = logging.handlers.SocketHandler('localhost',
+                                                        server.port)
+        self.log_output = ''
         self.root_logger.removeHandler(self.root_logger.handlers[0])
         self.root_logger.addHandler(self.sock_hdlr)
+        self.handled = threading.Semaphore(0)
 
     def tearDown(self):
         """Shutdown the TCP server."""
         try:
-            if hasattr(self, 'tcpserver'):
-                self.tcpserver.abort = True
-                del self.tcpserver
+            self.server.stop(2.0)
             self.root_logger.removeHandler(self.sock_hdlr)
             self.sock_hdlr.close()
-            for thread in self.threads:
-                thread.join(2.0)
         finally:
             BaseTest.tearDown(self)
 
-    def get_output(self):
-        """Get the log output as received by the TCP server."""
-        # Signal the TCP receiver and wait for it to terminate.
-        self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
-        self.tcpserver.finished.wait(2.0)
-        return self.tcpserver.log_output
+    def handle_socket(self, request):
+        conn = request.connection
+        while True:
+            chunk = conn.recv(4)
+            if len(chunk) < 4:
+                break
+            slen = struct.unpack(">L", chunk)[0]
+            chunk = conn.recv(slen)
+            while len(chunk) < slen:
+                chunk = chunk + conn.recv(slen - len(chunk))
+            obj = pickle.loads(chunk)
+            record = logging.makeLogRecord(obj)
+            self.log_output += record.msg + '\n'
+            self.handled.release()
 
     def test_output(self):
         # The log message sent to the SocketHandler is properly received.
         logger = logging.getLogger("tcp")
         logger.error("spam")
+        self.handled.acquire()
         logger.debug("eggs")
-        self.assertEqual(self.get_output(), "spam\neggs\n")
+        self.handled.acquire()
+        self.assertEqual(self.log_output, "spam\neggs\n")
 
     def test_noserver(self):
         # Kill the server
-        self.tcpserver.abort = True
-        del self.tcpserver
-        for thread in self.threads:
-            thread.join(2.0)
+        self.server.stop(2.0)
         #The logging call should try to connect, which should fail
         try:
             raise RuntimeError('Deliberate mistake')
@@ -1196,6 +1279,143 @@
         time.sleep(self.sock_hdlr.retryTime - now + 0.001)
         self.root_logger.error('Nor this')
 
+
+ at unittest.skipUnless(threading, 'Threading required for this test.')
+class DatagramHandlerTest(BaseTest):
+
+    """Test for DatagramHandler."""
+
+    def setUp(self):
+        """Set up a UDP server to receive log messages, and a DatagramHandler
+        pointing to that server's address and port."""
+        BaseTest.setUp(self)
+        addr = ('localhost', 0)
+        self.server = server = TestUDPServer(addr, self.handle_datagram,
+                                                0.01)
+        server.start()
+        server.ready.wait()
+        self.sock_hdlr = logging.handlers.DatagramHandler('localhost',
+                                                          server.port)
+        self.log_output = ''
+        self.root_logger.removeHandler(self.root_logger.handlers[0])
+        self.root_logger.addHandler(self.sock_hdlr)
+        self.handled = threading.Event()
+
+    def tearDown(self):
+        """Shutdown the UDP server."""
+        try:
+            self.server.stop(2.0)
+            self.root_logger.removeHandler(self.sock_hdlr)
+            self.sock_hdlr.close()
+        finally:
+            BaseTest.tearDown(self)
+
+    def handle_datagram(self, request):
+        slen = struct.pack('>L', 0) # length of prefix
+        packet = request.packet[len(slen):]
+        obj = pickle.loads(packet)
+        record = logging.makeLogRecord(obj)
+        self.log_output += record.msg + '\n'
+        self.handled.set()
+
+    def test_output(self):
+        # The log message sent to the DatagramHandler is properly received.
+        logger = logging.getLogger("udp")
+        logger.error("spam")
+        self.handled.wait()
+        self.assertEqual(self.log_output, "spam\n")
+
+
+ at unittest.skipUnless(threading, 'Threading required for this test.')
+class SysLogHandlerTest(BaseTest):
+
+    """Test for SysLogHandler using UDP."""
+
+    def setUp(self):
+        """Set up a UDP server to receive log messages, and a SysLogHandler
+        pointing to that server's address and port."""
+        BaseTest.setUp(self)
+        addr = ('localhost', 0)
+        self.server = server = TestUDPServer(addr, self.handle_datagram,
+                                                0.01)
+        server.start()
+        server.ready.wait()
+        self.sl_hdlr = logging.handlers.SysLogHandler(('localhost',
+                                                       server.port))
+        self.log_output = ''
+        self.root_logger.removeHandler(self.root_logger.handlers[0])
+        self.root_logger.addHandler(self.sl_hdlr)
+        self.handled = threading.Event()
+
+    def tearDown(self):
+        """Shutdown the UDP server."""
+        try:
+            self.server.stop(2.0)
+            self.root_logger.removeHandler(self.sl_hdlr)
+            self.sl_hdlr.close()
+        finally:
+            BaseTest.tearDown(self)
+
+    def handle_datagram(self, request):
+        self.log_output = request.packet
+        self.handled.set()
+
+    def test_output(self):
+        # The log message sent to the SysLogHandler is properly received.
+        logger = logging.getLogger("slh")
+        logger.error("sp\xe4m")
+        self.handled.wait()
+        self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfsp\xc3\xa4m\x00')
+
+
+ at unittest.skipUnless(threading, 'Threading required for this test.')
+class HTTPHandlerTest(BaseTest):
+
+    """Test for HTTPHandler."""
+
+    def setUp(self):
+        """Set up an HTTP server to receive log messages, and a HTTPHandler
+        pointing to that server's address and port."""
+        BaseTest.setUp(self)
+        addr = ('localhost', 0)
+        self.server = server = TestHTTPServer(addr, self.handle_request,
+                                                0.01)
+        server.start()
+        server.ready.wait()
+        host = 'localhost:%d' % server.server_port
+        self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob')
+        self.log_data = None
+        self.root_logger.removeHandler(self.root_logger.handlers[0])
+        self.root_logger.addHandler(self.h_hdlr)
+        self.handled = threading.Event()
+
+    def tearDown(self):
+        """Shutdown the UDP server."""
+        try:
+            self.server.stop(2.0)
+            self.root_logger.removeHandler(self.h_hdlr)
+            self.h_hdlr.close()
+        finally:
+            BaseTest.tearDown(self)
+
+    def handle_request(self, request):
+        self.log_data = urlparse(request.path)
+        request.send_response(200)
+        self.handled.set()
+
+    def test_output(self):
+        # The log message sent to the SysLogHandler is properly received.
+        logger = logging.getLogger("http")
+        msg = "sp\xe4m"
+        logger.error(msg)
+        self.handled.wait()
+        self.assertEqual(self.log_data.path, '/frob')
+        d = parse_qs(self.log_data.query)
+        self.assertEqual(d['name'], ['http'])
+        self.assertEqual(d['funcName'], ['test_output'])
+        self.assertEqual(d['msg'], [msg])
+
+
 class MemoryTest(BaseTest):
 
     """Test memory persistence of logger objects."""
@@ -3129,7 +3349,7 @@
     def assertLogFile(self, filename):
         "Assert a log file is there and register it for deletion"
         self.assertTrue(os.path.exists(filename),
-                        msg="Log file %r does not exist")
+                        msg="Log file %r does not exist" % filename)
         self.rmfiles.append(filename)
 
 
@@ -3181,8 +3401,18 @@
         rh.close()
 
 class TimedRotatingFileHandlerTest(BaseFileTest):
-    # test methods added below
-    pass
+    # other test methods added below
+    def test_rollover(self):
+        fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S')
+        r = logging.makeLogRecord({'msg': 'testing'})
+        fh.emit(r)
+        self.assertLogFile(self.fn)
+        time.sleep(1.0)
+        fh.emit(r)
+        now = datetime.datetime.now()
+        prevsec = now - datetime.timedelta(seconds=1)
+        suffix = prevsec.strftime(".%Y-%m-%d_%H-%M-%S")
+        self.assertLogFile(self.fn + suffix)
 
 def secs(**kw):
     return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
@@ -3238,14 +3468,15 @@
 def test_main():
     run_unittest(BuiltinLevelsTest, BasicFilterTest,
                  CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest,
-                 ConfigFileTest, SocketHandlerTest, MemoryTest,
-                 EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
-                 FormatterTest, BufferingFormatterTest, StreamHandlerTest,
-                 LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
-                 ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
-                 LoggerAdapterTest, LoggerTest, SMTPHandlerTest,
-                 FileHandlerTest, RotatingFileHandlerTest,
+                 ConfigFileTest, SocketHandlerTest, DatagramHandlerTest,
+                 MemoryTest, EncodingTest, WarningsTest, ConfigDictTest,
+                 ManagerTest, FormatterTest, BufferingFormatterTest,
+                 StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
+                 QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest,
+                 BasicConfigTest, LoggerAdapterTest, LoggerTest,
+                 SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest,
                  LastResortTest, LogRecordTest, ExceptionTest,
+                 SysLogHandlerTest, HTTPHandlerTest,
                  TimedRotatingFileHandlerTest
                 )
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list