[Python-checkins] r79437 - in python/trunk/Lib/unittest: __init__.py main.py runner.py signals.py test/test_break.py test/test_discovery.py

michael.foord python-checkins at python.org
Fri Mar 26 04:18:31 CET 2010


Author: michael.foord
Date: Fri Mar 26 04:18:31 2010
New Revision: 79437

Log:
Addition of -c command line option to unittest, to handle ctrl-c during a test run more elegantly

Added:
   python/trunk/Lib/unittest/signals.py   (contents, props changed)
   python/trunk/Lib/unittest/test/test_break.py   (contents, props changed)
Modified:
   python/trunk/Lib/unittest/__init__.py
   python/trunk/Lib/unittest/main.py
   python/trunk/Lib/unittest/runner.py
   python/trunk/Lib/unittest/test/test_discovery.py

Modified: python/trunk/Lib/unittest/__init__.py
==============================================================================
--- python/trunk/Lib/unittest/__init__.py	(original)
+++ python/trunk/Lib/unittest/__init__.py	Fri Mar 26 04:18:31 2010
@@ -47,7 +47,8 @@
 __all__ = ['TestResult', 'TestCase', 'TestSuite',
            'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
            'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
-           'expectedFailure', 'TextTestResult']
+           'expectedFailure', 'TextTestResult', 'installHandler',
+           'registerResult', 'removeResult']
 
 # Expose obsolete functions for backwards compatibility
 __all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])
@@ -62,6 +63,7 @@
                      findTestCases)
 from .main import TestProgram, main
 from .runner import TextTestRunner, TextTestResult
+from .signals import installHandler, registerResult, removeResult
 
 # deprecated
 _TextTestResult = TextTestResult

Modified: python/trunk/Lib/unittest/main.py
==============================================================================
--- python/trunk/Lib/unittest/main.py	(original)
+++ python/trunk/Lib/unittest/main.py	Fri Mar 26 04:18:31 2010
@@ -5,10 +5,14 @@
 import types
 
 from . import loader, runner
+from .signals import installHandler
 
 __unittest = True
 
 
+FAILFAST = "  -f, --failfast   Stop on first failure\n"
+CATCHBREAK = "  -c, --catch      Catch control-C and display results\n"
+
 USAGE_AS_MAIN = """\
 Usage: %(progName)s [options] [tests]
 
@@ -16,8 +20,7 @@
   -h, --help       Show this message
   -v, --verbose    Verbose output
   -q, --quiet      Minimal output
-  -f, --failfast   Stop on first failure
-
+%(failfast)s%(catchbreak)s
 Examples:
   %(progName)s test_module                       - run tests from test_module
   %(progName)s test_module.TestClass             - run tests from
@@ -31,8 +34,7 @@
 
 Options:
   -v, --verbose    Verbose output
-  -f, --failfast   Stop on first failure
-  -s directory     Directory to start discovery ('.' default)
+%(failfast)s%(catchbreak)s  -s directory     Directory to start discovery ('.' default)
   -p pattern       Pattern to match test files ('test*.py' default)
   -t directory     Top level directory of project (default to
                    start directory)
@@ -48,8 +50,7 @@
   -h, --help       Show this message
   -v, --verbose    Verbose output
   -q, --quiet      Minimal output
-  -f, --failfast   Stop on first failure
-
+%(failfast)s%(catchbreak)s
 Examples:
   %(progName)s                               - run default set of tests
   %(progName)s MyTestSuite                   - run suite 'MyTestSuite'
@@ -58,15 +59,21 @@
                                                in MyTestCase
 """
 
+
+
 class TestProgram(object):
     """A command-line program that runs a set of tests; this is primarily
        for making test modules conveniently executable.
     """
     USAGE = USAGE_FROM_MODULE
+
+    # defaults for testing
+    failfast = catchbreak = None
+
     def __init__(self, module='__main__', defaultTest=None,
                  argv=None, testRunner=None,
                  testLoader=loader.defaultTestLoader, exit=True,
-                 verbosity=1, failfast=False):
+                 verbosity=1, failfast=None, catchbreak=None):
         if isinstance(module, basestring):
             self.module = __import__(module)
             for part in module.split('.')[1:]:
@@ -78,6 +85,7 @@
 
         self.exit = exit
         self.failfast = failfast
+        self.catchbreak = catchbreak
         self.verbosity = verbosity
         self.defaultTest = defaultTest
         self.testRunner = testRunner
@@ -89,7 +97,12 @@
     def usageExit(self, msg=None):
         if msg:
             print msg
-        print self.USAGE % self.__dict__
+        usage = {'progName': self.progName, 'catchbreak': '', 'failfast': ''}
+        if self.failfast != False:
+            usage['failfast'] = FAILFAST
+        if self.catchbreak != False:
+            usage['catchbreak'] = CATCHBREAK
+        print self.USAGE % usage
         sys.exit(2)
 
     def parseArgs(self, argv):
@@ -98,9 +111,9 @@
             return
 
         import getopt
-        long_opts = ['help', 'verbose', 'quiet', 'failfast']
+        long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch']
         try:
-            options, args = getopt.getopt(argv[1:], 'hHvqf', long_opts)
+            options, args = getopt.getopt(argv[1:], 'hHvqfc', long_opts)
             for opt, value in options:
                 if opt in ('-h','-H','--help'):
                     self.usageExit()
@@ -109,7 +122,13 @@
                 if opt in ('-v','--verbose'):
                     self.verbosity = 2
                 if opt in ('-f','--failfast'):
-                    self.failfast = True
+                    if self.failfast is None:
+                        self.failfast = True
+                    # Should this raise an exception if -f is not valid?
+                if opt in ('-c','--catch'):
+                    if self.catchbreak is None:
+                        self.catchbreak = True
+                    # Should this raise an exception if -c is not valid?
             if len(args) == 0 and self.defaultTest is None:
                 # createTests will load tests from self.module
                 self.testNames = None
@@ -137,8 +156,14 @@
         parser = optparse.OptionParser()
         parser.add_option('-v', '--verbose', dest='verbose', default=False,
                           help='Verbose output', action='store_true')
-        parser.add_option('-f', '--failfast', dest='failfast', default=False,
-                          help='Stop on first fail or error', action='store_true')
+        if self.failfast != False:
+            parser.add_option('-f', '--failfast', dest='failfast', default=False,
+                              help='Stop on first fail or error',
+                              action='store_true')
+        if self.catchbreak != False:
+            parser.add_option('-c', '--catch', dest='catchbreak', default=False,
+                              help='Catch ctrl-C and display results so far',
+                              action='store_true')
         parser.add_option('-s', '--start-directory', dest='start', default='.',
                           help="Directory to start discovery ('.' default)")
         parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
@@ -153,7 +178,13 @@
         for name, value in zip(('start', 'pattern', 'top'), args):
             setattr(options, name, value)
 
-        self.failfast = options.failfast
+        # only set options from the parsing here
+        # if they weren't set explicitly in the constructor
+        if self.failfast is None:
+            self.failfast = options.failfast
+        if self.catchbreak is None:
+            self.catchbreak = options.catchbreak
+
         if options.verbose:
             self.verbosity = 2
 
@@ -165,6 +196,8 @@
         self.test = loader.discover(start_dir, pattern, top_level_dir)
 
     def runTests(self):
+        if self.catchbreak:
+            installHandler()
         if self.testRunner is None:
             self.testRunner = runner.TextTestRunner
         if isinstance(self.testRunner, (type, types.ClassType)):

Modified: python/trunk/Lib/unittest/runner.py
==============================================================================
--- python/trunk/Lib/unittest/runner.py	(original)
+++ python/trunk/Lib/unittest/runner.py	Fri Mar 26 04:18:31 2010
@@ -4,6 +4,7 @@
 import time
 
 from . import result
+from .signals import registerResult
 
 __unittest = True
 
@@ -138,6 +139,7 @@
     def run(self, test):
         "Run the given test case or test suite."
         result = self._makeResult()
+        registerResult(result)
         result.failfast = self.failfast
         startTime = time.time()
         startTestRun = getattr(result, 'startTestRun', None)

Added: python/trunk/Lib/unittest/signals.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/unittest/signals.py	Fri Mar 26 04:18:31 2010
@@ -0,0 +1,38 @@
+import signal
+import weakref
+
+__unittest = True
+
+
+class _InterruptHandler(object):
+    def __init__(self, default_handler):
+        self.called = False
+        self.default_handler = default_handler
+
+    def __call__(self, signum, frame):
+        installed_handler = signal.getsignal(signal.SIGINT)
+        if installed_handler is not self:
+            # if we aren't the installed handler, then delegate immediately
+            # to the default handler
+            self.default_handler(signum, frame)
+
+        if self.called:
+            self.default_handler(signum, frame)
+        self.called = True
+        for result in _results.keys():
+            result.stop()
+
+_results = weakref.WeakKeyDictionary()
+def registerResult(result):
+    _results[result] = 1
+
+def removeResult(result):
+    return bool(_results.pop(result, None))
+
+_interrupt_handler = None
+def installHandler():
+    global _interrupt_handler
+    if _interrupt_handler is None:
+        default_handler = signal.getsignal(signal.SIGINT)
+        _interrupt_handler = _InterruptHandler(default_handler)
+        signal.signal(signal.SIGINT, _interrupt_handler)

Added: python/trunk/Lib/unittest/test/test_break.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/unittest/test/test_break.py	Fri Mar 26 04:18:31 2010
@@ -0,0 +1,225 @@
+import gc
+import os
+import signal
+import weakref
+
+from cStringIO import StringIO
+
+
+import unittest
+
+
+ at unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
+class TestBreak(unittest.TestCase):
+
+    def setUp(self):
+        self._default_handler = signal.getsignal(signal.SIGINT)
+
+    def tearDown(self):
+        signal.signal(signal.SIGINT, self._default_handler)
+        unittest.signals._results = weakref.WeakKeyDictionary()
+        unittest.signals._interrupt_handler = None
+
+
+    def testInstallHandler(self):
+        default_handler = signal.getsignal(signal.SIGINT)
+        unittest.installHandler()
+        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+        try:
+            pid = os.getpid()
+            os.kill(pid, signal.SIGINT)
+        except KeyboardInterrupt:
+            self.fail("KeyboardInterrupt not handled")
+
+        self.assertTrue(unittest.signals._interrupt_handler.called)
+
+    def testRegisterResult(self):
+        result = unittest.TestResult()
+        unittest.registerResult(result)
+
+        for ref in unittest.signals._results:
+            if ref is result:
+                break
+            elif ref is not result:
+                self.fail("odd object in result set")
+        else:
+            self.fail("result not found")
+
+
+    def testInterruptCaught(self):
+        default_handler = signal.getsignal(signal.SIGINT)
+
+        result = unittest.TestResult()
+        unittest.installHandler()
+        unittest.registerResult(result)
+
+        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+        def test(result):
+            pid = os.getpid()
+            os.kill(pid, signal.SIGINT)
+            result.breakCaught = True
+            self.assertTrue(result.shouldStop)
+
+        try:
+            test(result)
+        except KeyboardInterrupt:
+            self.fail("KeyboardInterrupt not handled")
+        self.assertTrue(result.breakCaught)
+
+
+    def testSecondInterrupt(self):
+        result = unittest.TestResult()
+        unittest.installHandler()
+        unittest.registerResult(result)
+
+        def test(result):
+            pid = os.getpid()
+            os.kill(pid, signal.SIGINT)
+            result.breakCaught = True
+            self.assertTrue(result.shouldStop)
+            os.kill(pid, signal.SIGINT)
+            self.fail("Second KeyboardInterrupt not raised")
+
+        try:
+            test(result)
+        except KeyboardInterrupt:
+            pass
+        else:
+            self.fail("Second KeyboardInterrupt not raised")
+        self.assertTrue(result.breakCaught)
+
+
+    def testTwoResults(self):
+        unittest.installHandler()
+
+        result = unittest.TestResult()
+        unittest.registerResult(result)
+        new_handler = signal.getsignal(signal.SIGINT)
+
+        result2 = unittest.TestResult()
+        unittest.registerResult(result2)
+        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
+
+        result3 = unittest.TestResult()
+
+        def test(result):
+            pid = os.getpid()
+            os.kill(pid, signal.SIGINT)
+
+        try:
+            test(result)
+        except KeyboardInterrupt:
+            self.fail("KeyboardInterrupt not handled")
+
+        self.assertTrue(result.shouldStop)
+        self.assertTrue(result2.shouldStop)
+        self.assertFalse(result3.shouldStop)
+
+
+    def testHandlerReplacedButCalled(self):
+        # If our handler has been replaced (is no longer installed) but is
+        # called by the *new* handler, then it isn't safe to delay the
+        # SIGINT and we should immediately delegate to the default handler
+        unittest.installHandler()
+
+        handler = signal.getsignal(signal.SIGINT)
+        def new_handler(frame, signum):
+            handler(frame, signum)
+        signal.signal(signal.SIGINT, new_handler)
+
+        try:
+            pid = os.getpid()
+            os.kill(pid, signal.SIGINT)
+        except KeyboardInterrupt:
+            pass
+        else:
+            self.fail("replaced but delegated handler doesn't raise interrupt")
+
+    def testRunner(self):
+        # Creating a TextTestRunner with the appropriate argument should
+        # register the TextTestResult it creates
+        runner = unittest.TextTestRunner(stream=StringIO())
+
+        result = runner.run(unittest.TestSuite())
+        self.assertIn(result, unittest.signals._results)
+
+    def testWeakReferences(self):
+        # Calling registerResult on a result should not keep it alive
+        result = unittest.TestResult()
+        unittest.registerResult(result)
+
+        ref = weakref.ref(result)
+        del result
+
+        # For non-reference counting implementations
+        gc.collect();gc.collect()
+        self.assertIsNone(ref())
+
+
+    def testRemoveResult(self):
+        result = unittest.TestResult()
+        unittest.registerResult(result)
+
+        unittest.installHandler()
+        self.assertTrue(unittest.removeResult(result))
+
+        # Should this raise an error instead?
+        self.assertFalse(unittest.removeResult(unittest.TestResult()))
+
+        try:
+            pid = os.getpid()
+            os.kill(pid, signal.SIGINT)
+        except KeyboardInterrupt:
+            pass
+
+        self.assertFalse(result.shouldStop)
+
+    def testMainInstallsHandler(self):
+        failfast = object()
+        test = object()
+        verbosity = object()
+        result = object()
+        default_handler = signal.getsignal(signal.SIGINT)
+
+        class FakeRunner(object):
+            initArgs = []
+            runArgs = []
+            def __init__(self, *args, **kwargs):
+                self.initArgs.append((args, kwargs))
+            def run(self, test):
+                self.runArgs.append(test)
+                return result
+
+        class Program(unittest.TestProgram):
+            def __init__(self, catchbreak):
+                self.exit = False
+                self.verbosity = verbosity
+                self.failfast = failfast
+                self.catchbreak = catchbreak
+                self.testRunner = FakeRunner
+                self.test = test
+                self.result = None
+
+        p = Program(False)
+        p.runTests()
+
+        self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
+                                                'failfast': failfast})])
+        self.assertEqual(FakeRunner.runArgs, [test])
+        self.assertEqual(p.result, result)
+
+        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+        FakeRunner.initArgs = []
+        FakeRunner.runArgs = []
+        p = Program(True)
+        p.runTests()
+
+        self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
+                                                'failfast': failfast})])
+        self.assertEqual(FakeRunner.runArgs, [test])
+        self.assertEqual(p.result, result)
+
+        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)

Modified: python/trunk/Lib/unittest/test/test_discovery.py
==============================================================================
--- python/trunk/Lib/unittest/test/test_discovery.py	(original)
+++ python/trunk/Lib/unittest/test/test_discovery.py	Fri Mar 26 04:18:31 2010
@@ -279,14 +279,17 @@
         self.assertEqual(program.test, 'tests')
         self.assertEqual(Loader.args, [('.', 'fish', None)])
         self.assertFalse(program.failfast)
+        self.assertFalse(program.catchbreak)
 
         Loader.args = []
         program = object.__new__(unittest.TestProgram)
-        program._do_discovery(['-p', 'eggs', '-s', 'fish', '-v', '-f'], Loader=Loader)
+        program._do_discovery(['-p', 'eggs', '-s', 'fish', '-v', '-f', '-c'],
+                              Loader=Loader)
         self.assertEqual(program.test, 'tests')
         self.assertEqual(Loader.args, [('fish', 'eggs', None)])
         self.assertEqual(program.verbosity, 2)
         self.assertTrue(program.failfast)
+        self.assertTrue(program.catchbreak)
 
 
 if __name__ == '__main__':


More information about the Python-checkins mailing list