[Python-checkins] cpython: Improved test coverage.

vinay.sajip python-checkins at python.org
Sat Apr 30 22:53:03 CEST 2011


http://hg.python.org/cpython/rev/2ab07776510c
changeset:   69723:2ab07776510c
user:        Vinay Sajip <vinay_sajip at yahoo.co.uk>
date:        Sat Apr 30 21:52:48 2011 +0100
summary:
  Improved test coverage.

files:
  Lib/test/test_logging.py |  109 ++++++++++++++++++++++++++-
  1 files changed, 105 insertions(+), 4 deletions(-)


diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -25,8 +25,11 @@
 import logging.handlers
 import logging.config
 
+import asynchat
+import asyncore
 import codecs
 import datetime
+import errno
 import pickle
 import io
 import gc
@@ -35,6 +38,7 @@
 import queue
 import re
 import select
+import smtpd
 import socket
 from socketserver import ThreadingTCPServer, StreamRequestHandler
 import struct
@@ -547,9 +551,6 @@
                 h.close()
             except socket.error: # syslogd might not be available
                 pass
-        h = logging.handlers.SMTPHandler('localhost', 'me', 'you', 'Log')
-        self.assertEqual(h.toaddrs, ['you'])
-        h.close()
         for method in ('GET', 'POST', 'PUT'):
             if method == 'PUT':
                 self.assertRaises(ValueError, logging.handlers.HTTPHandler,
@@ -595,6 +596,106 @@
             logging.raiseExceptions = old_raise
             sys.stderr = old_stderr
 
+
+TEST_SMTP_PORT = 9025
+
+
+class TestSMTPChannel(smtpd.SMTPChannel):
+    def __init__(self, server, conn, addr, sockmap):
+        asynchat.async_chat.__init__(self, conn, sockmap)
+        self.smtp_server = server
+        self.conn = conn
+        self.addr = addr
+        self.received_lines = []
+        self.smtp_state = self.COMMAND
+        self.seen_greeting = ''
+        self.mailfrom = None
+        self.rcpttos = []
+        self.received_data = ''
+        self.fqdn = socket.getfqdn()
+        self.num_bytes = 0
+        try:
+            self.peer = conn.getpeername()
+        except socket.error as err:
+            # a race condition  may occur if the other end is closing
+            # before we can get the peername
+            self.close()
+            if err.args[0] != errno.ENOTCONN:
+                raise
+            return
+        self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
+        self.set_terminator(b'\r\n')
+
+
+class TestSMTPServer(smtpd.SMTPServer):
+    channel_class = TestSMTPChannel
+
+    def __init__(self, addr, handler, poll_interval, sockmap):
+        self._localaddr = addr
+        self._remoteaddr = None
+        self.sockmap = sockmap
+        asyncore.dispatcher.__init__(self, map=sockmap)
+        try:
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.setblocking(0)
+            self.set_socket(sock, map=sockmap)
+            # try to re-use a server port if possible
+            self.set_reuse_addr()
+            self.bind(addr)
+            self.listen(5)
+        except:
+            self.close()
+            raise
+        self._handler = handler
+        self._thread = None
+        self.poll_interval = poll_interval
+
+    def handle_accepted(self, conn, addr):
+        print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)
+        channel = self.channel_class(self, conn, addr, self.sockmap)
+
+    def process_message(self, peer, mailfrom, rcpttos, data):
+        self._handler(peer, mailfrom, rcpttos, data)
+
+    def start(self):
+        self._thread = t = threading.Thread(target=self.serve_forever,
+                                            args=(self.poll_interval,))
+        t.setDaemon(True)
+        t.start()
+
+    def serve_forever(self, poll_interval):
+        asyncore.loop(poll_interval, map=self.sockmap)
+
+    def stop(self):
+        self.close()
+        self._thread.join()
+        self._thread = None
+
+
+class SMTPHandlerTest(BaseTest):
+    def test_basic(self):
+        addr = ('localhost', TEST_SMTP_PORT)
+        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
+        self.assertEqual(h.toaddrs, ['you'])
+        self.messages = []
+        sockmap = {}
+        server = TestSMTPServer(addr, self.process_message, 0.001, sockmap)
+        server.start()
+        r = logging.makeLogRecord({'msg': 'Hello'})
+        h.handle(r)
+        server.stop()
+        self.assertEqual(len(self.messages), 1)
+        peer, mailfrom, rcpttos, data = self.messages[0]
+        self.assertEqual(mailfrom, 'me')
+        self.assertEqual(rcpttos, ['you'])
+        self.assertTrue('\nSubject: Log\n' in data)
+        self.assertTrue(data.endswith('\n\nHello'))
+        h.close()
+
+    def process_message(self, *args):
+        self.messages.append(args)
+
+
 class MemoryHandlerTest(BaseTest):
 
     """Tests for the MemoryHandler."""
@@ -3142,7 +3243,7 @@
                  FormatterTest, BufferingFormatterTest, StreamHandlerTest,
                  LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
                  ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
-                 LoggerAdapterTest, LoggerTest,
+                 LoggerAdapterTest, LoggerTest, SMTPHandlerTest,
                  FileHandlerTest, RotatingFileHandlerTest,
                  LastResortTest, LogRecordTest, ExceptionTest,
                  TimedRotatingFileHandlerTest

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list