[Python-checkins] cpython (merge 3.4 -> default): (Merge 3.4) Close #22175: Improve test_faulthandler readability with dedent.

victor.stinner python-checkins at python.org
Sun Aug 10 19:52:49 CEST 2014


http://hg.python.org/cpython/rev/2929cc408fbb
changeset:   92068:2929cc408fbb
parent:      92065:147d778e65e8
parent:      92067:51472399ca09
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Sun Aug 10 19:51:05 2014 +0200
summary:
  (Merge 3.4) Close #22175: Improve test_faulthandler readability with dedent.
Patch written by Xavier de Gaye.

files:
  Lib/test/test_faulthandler.py |  336 +++++++++++----------
  1 files changed, 169 insertions(+), 167 deletions(-)


diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py
--- a/Lib/test/test_faulthandler.py
+++ b/Lib/test/test_faulthandler.py
@@ -10,6 +10,7 @@
 from test.script_helper import assert_python_ok
 import tempfile
 import unittest
+from textwrap import dedent
 
 try:
     import threading
@@ -51,6 +52,7 @@
         build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
         thread XXX".
         """
+        code = dedent(code).strip()
         with support.SuppressCrashReport():
             process = script_helper.spawn_python('-c', code)
         stdout, stderr = process.communicate()
@@ -80,15 +82,15 @@
         else:
             header = 'Stack (most recent call first)'
         regex = """
-^Fatal Python error: {name}
+            ^Fatal Python error: {name}
 
-{header}:
-  File "<string>", line {lineno} in <module>
-""".strip()
-        regex = regex.format(
+            {header}:
+              File "<string>", line {lineno} in <module>
+            """
+        regex = dedent(regex.format(
             lineno=line_number,
             name=name_regex,
-            header=re.escape(header))
+            header=re.escape(header))).strip()
         if other_regex:
             regex += '|' + other_regex
         output, exitcode = self.get_output(code, filename)
@@ -100,29 +102,29 @@
                      "the first page of memory is a mapped read-only on AIX")
     def test_read_null(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._read_null()
-""".strip(),
+            import faulthandler
+            faulthandler.enable()
+            faulthandler._read_null()
+            """,
             3,
             # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
             '(?:Segmentation fault|Bus error|Illegal instruction)')
 
     def test_sigsegv(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigsegv()
-""".strip(),
+            import faulthandler
+            faulthandler.enable()
+            faulthandler._sigsegv()
+            """,
             3,
             'Segmentation fault')
 
     def test_sigabrt(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigabrt()
-""".strip(),
+            import faulthandler
+            faulthandler.enable()
+            faulthandler._sigabrt()
+            """,
             3,
             'Aborted')
 
@@ -130,10 +132,10 @@
                      "SIGFPE cannot be caught on Windows")
     def test_sigfpe(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._sigfpe()
-""".strip(),
+            import faulthandler
+            faulthandler.enable()
+            faulthandler._sigfpe()
+            """,
             3,
             'Floating point exception')
 
@@ -141,13 +143,13 @@
     @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
     def test_sigbus(self):
         self.check_fatal_error("""
-import _testcapi
-import faulthandler
-import signal
+            import _testcapi
+            import faulthandler
+            import signal
 
-faulthandler.enable()
-_testcapi.raise_signal(signal.SIGBUS)
-""".strip(),
+            faulthandler.enable()
+            _testcapi.raise_signal(signal.SIGBUS)
+            """,
             6,
             'Bus error')
 
@@ -155,21 +157,21 @@
     @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
     def test_sigill(self):
         self.check_fatal_error("""
-import _testcapi
-import faulthandler
-import signal
+            import _testcapi
+            import faulthandler
+            import signal
 
-faulthandler.enable()
-_testcapi.raise_signal(signal.SIGILL)
-""".strip(),
+            faulthandler.enable()
+            _testcapi.raise_signal(signal.SIGILL)
+            """,
             6,
             'Illegal instruction')
 
     def test_fatal_error(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler._fatal_error(b'xyz')
-""".strip(),
+            import faulthandler
+            faulthandler._fatal_error(b'xyz')
+            """,
             2,
             'xyz')
 
@@ -180,52 +182,52 @@
                      'need faulthandler._stack_overflow()')
     def test_stack_overflow(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._stack_overflow()
-""".strip(),
+            import faulthandler
+            faulthandler.enable()
+            faulthandler._stack_overflow()
+            """,
             3,
             '(?:Segmentation fault|Bus error)',
             other_regex='unable to raise a stack overflow')
 
     def test_gil_released(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable()
-faulthandler._read_null(True)
-""".strip(),
+            import faulthandler
+            faulthandler.enable()
+            faulthandler._read_null(True)
+            """,
             3,
             '(?:Segmentation fault|Bus error|Illegal instruction)')
 
     def test_enable_file(self):
         with temporary_filename() as filename:
             self.check_fatal_error("""
-import faulthandler
-output = open({filename}, 'wb')
-faulthandler.enable(output)
-faulthandler._sigsegv()
-""".strip().format(filename=repr(filename)),
+                import faulthandler
+                output = open({filename}, 'wb')
+                faulthandler.enable(output)
+                faulthandler._sigsegv()
+                """.format(filename=repr(filename)),
                 4,
                 'Segmentation fault',
                 filename=filename)
 
     def test_enable_single_thread(self):
         self.check_fatal_error("""
-import faulthandler
-faulthandler.enable(all_threads=False)
-faulthandler._sigsegv()
-""".strip(),
+            import faulthandler
+            faulthandler.enable(all_threads=False)
+            faulthandler._sigsegv()
+            """,
             3,
             'Segmentation fault',
             all_threads=False)
 
     def test_disable(self):
         code = """
-import faulthandler
-faulthandler.enable()
-faulthandler.disable()
-faulthandler._sigsegv()
-""".strip()
+            import faulthandler
+            faulthandler.enable()
+            faulthandler.disable()
+            faulthandler._sigsegv()
+            """
         not_expected = 'Fatal Python error'
         stderr, exitcode = self.get_output(code)
         stder = '\n'.join(stderr)
@@ -293,20 +295,20 @@
         Raise an error if the output doesn't match the expected format.
         """
         code = """
-import faulthandler
+            import faulthandler
 
-def funcB():
-    if {has_filename}:
-        with open({filename}, "wb") as fp:
-            faulthandler.dump_traceback(fp, all_threads=False)
-    else:
-        faulthandler.dump_traceback(all_threads=False)
+            def funcB():
+                if {has_filename}:
+                    with open({filename}, "wb") as fp:
+                        faulthandler.dump_traceback(fp, all_threads=False)
+                else:
+                    faulthandler.dump_traceback(all_threads=False)
 
-def funcA():
-    funcB()
+            def funcA():
+                funcB()
 
-funcA()
-""".strip()
+            funcA()
+            """
         code = code.format(
             filename=repr(filename),
             has_filename=bool(filename),
@@ -337,13 +339,13 @@
         func_name = 'x' * (maxlen + 50)
         truncated = 'x' * maxlen + '...'
         code = """
-import faulthandler
+            import faulthandler
 
-def {func_name}():
-    faulthandler.dump_traceback(all_threads=False)
+            def {func_name}():
+                faulthandler.dump_traceback(all_threads=False)
 
-{func_name}()
-""".strip()
+            {func_name}()
+            """
         code = code.format(
             func_name=func_name,
         )
@@ -363,37 +365,37 @@
         Raise an error if the output doesn't match the expected format.
         """
         code = """
-import faulthandler
-from threading import Thread, Event
-import time
+            import faulthandler
+            from threading import Thread, Event
+            import time
 
-def dump():
-    if {filename}:
-        with open({filename}, "wb") as fp:
-            faulthandler.dump_traceback(fp, all_threads=True)
-    else:
-        faulthandler.dump_traceback(all_threads=True)
+            def dump():
+                if {filename}:
+                    with open({filename}, "wb") as fp:
+                        faulthandler.dump_traceback(fp, all_threads=True)
+                else:
+                    faulthandler.dump_traceback(all_threads=True)
 
-class Waiter(Thread):
-    # avoid blocking if the main thread raises an exception.
-    daemon = True
+            class Waiter(Thread):
+                # avoid blocking if the main thread raises an exception.
+                daemon = True
 
-    def __init__(self):
-        Thread.__init__(self)
-        self.running = Event()
-        self.stop = Event()
+                def __init__(self):
+                    Thread.__init__(self)
+                    self.running = Event()
+                    self.stop = Event()
 
-    def run(self):
-        self.running.set()
-        self.stop.wait()
+                def run(self):
+                    self.running.set()
+                    self.stop.wait()
 
-waiter = Waiter()
-waiter.start()
-waiter.running.wait()
-dump()
-waiter.stop.set()
-waiter.join()
-""".strip()
+            waiter = Waiter()
+            waiter.start()
+            waiter.running.wait()
+            dump()
+            waiter.stop.set()
+            waiter.join()
+            """
         code = code.format(filename=repr(filename))
         output, exitcode = self.get_output(code, filename)
         output = '\n'.join(output)
@@ -402,17 +404,17 @@
         else:
             lineno = 10
         regex = """
-^Thread 0x[0-9a-f]+ \(most recent call first\):
-(?:  File ".*threading.py", line [0-9]+ in [_a-z]+
-){{1,3}}  File "<string>", line 23 in run
-  File ".*threading.py", line [0-9]+ in _bootstrap_inner
-  File ".*threading.py", line [0-9]+ in _bootstrap
+            ^Thread 0x[0-9a-f]+ \(most recent call first\):
+            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
+            ){{1,3}}  File "<string>", line 23 in run
+              File ".*threading.py", line [0-9]+ in _bootstrap_inner
+              File ".*threading.py", line [0-9]+ in _bootstrap
 
-Current thread XXX \(most recent call first\):
-  File "<string>", line {lineno} in dump
-  File "<string>", line 28 in <module>$
-""".strip()
-        regex = regex.format(lineno=lineno)
+            Current thread XXX \(most recent call first\):
+              File "<string>", line {lineno} in dump
+              File "<string>", line 28 in <module>$
+            """
+        regex = dedent(regex.format(lineno=lineno)).strip()
         self.assertRegex(output, regex)
         self.assertEqual(exitcode, 0)
 
@@ -433,29 +435,29 @@
         """
         timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
         code = """
-import faulthandler
-import time
+            import faulthandler
+            import time
 
-def func(timeout, repeat, cancel, file, loops):
-    for loop in range(loops):
-        faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
-        if cancel:
-            faulthandler.cancel_dump_traceback_later()
-        time.sleep(timeout * 5)
-        faulthandler.cancel_dump_traceback_later()
+            def func(timeout, repeat, cancel, file, loops):
+                for loop in range(loops):
+                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
+                    if cancel:
+                        faulthandler.cancel_dump_traceback_later()
+                    time.sleep(timeout * 5)
+                    faulthandler.cancel_dump_traceback_later()
 
-timeout = {timeout}
-repeat = {repeat}
-cancel = {cancel}
-loops = {loops}
-if {has_filename}:
-    file = open({filename}, "wb")
-else:
-    file = None
-func(timeout, repeat, cancel, file, loops)
-if file is not None:
-    file.close()
-""".strip()
+            timeout = {timeout}
+            repeat = {repeat}
+            cancel = {cancel}
+            loops = {loops}
+            if {has_filename}:
+                file = open({filename}, "wb")
+            else:
+                file = None
+            func(timeout, repeat, cancel, file, loops)
+            if file is not None:
+                file.close()
+            """
         code = code.format(
             timeout=TIMEOUT,
             repeat=repeat,
@@ -522,45 +524,45 @@
         """
         signum = signal.SIGUSR1
         code = """
-import faulthandler
-import os
-import signal
-import sys
+            import faulthandler
+            import os
+            import signal
+            import sys
 
-def func(signum):
-    os.kill(os.getpid(), signum)
+            def func(signum):
+                os.kill(os.getpid(), signum)
 
-def handler(signum, frame):
-    handler.called = True
-handler.called = False
+            def handler(signum, frame):
+                handler.called = True
+            handler.called = False
 
-exitcode = 0
-signum = {signum}
-unregister = {unregister}
-chain = {chain}
+            exitcode = 0
+            signum = {signum}
+            unregister = {unregister}
+            chain = {chain}
 
-if {has_filename}:
-    file = open({filename}, "wb")
-else:
-    file = None
-if chain:
-    signal.signal(signum, handler)
-faulthandler.register(signum, file=file,
-                      all_threads={all_threads}, chain={chain})
-if unregister:
-    faulthandler.unregister(signum)
-func(signum)
-if chain and not handler.called:
-    if file is not None:
-        output = file
-    else:
-        output = sys.stderr
-    print("Error: signal handler not called!", file=output)
-    exitcode = 1
-if file is not None:
-    file.close()
-sys.exit(exitcode)
-""".strip()
+            if {has_filename}:
+                file = open({filename}, "wb")
+            else:
+                file = None
+            if chain:
+                signal.signal(signum, handler)
+            faulthandler.register(signum, file=file,
+                                  all_threads={all_threads}, chain={chain})
+            if unregister:
+                faulthandler.unregister(signum)
+            func(signum)
+            if chain and not handler.called:
+                if file is not None:
+                    output = file
+                else:
+                    output = sys.stderr
+                print("Error: signal handler not called!", file=output)
+                exitcode = 1
+            if file is not None:
+                file.close()
+            sys.exit(exitcode)
+            """
         code = code.format(
             filename=repr(filename),
             has_filename=bool(filename),

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list