| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896 |
- from contextlib import contextmanager
- import datetime
- import faulthandler
- import os
- import re
- import signal
- import subprocess
- import sys
- from test import support
- from test.support import os_helper
- from test.support import script_helper, is_android
- from test.support import skip_if_sanitizer
- import tempfile
- import unittest
- from textwrap import dedent
- try:
- import _testcapi
- except ImportError:
- _testcapi = None
- if not support.has_subprocess_support:
- raise unittest.SkipTest("test module requires subprocess")
- TIMEOUT = 0.5
- MS_WINDOWS = (os.name == 'nt')
- def expected_traceback(lineno1, lineno2, header, min_count=1):
- regex = header
- regex += ' File "<string>", line %s in func\n' % lineno1
- regex += ' File "<string>", line %s in <module>' % lineno2
- if 1 < min_count:
- return '^' + (regex + '\n') * (min_count - 1) + regex
- else:
- return '^' + regex + '$'
- def skip_segfault_on_android(test):
- # Issue #32138: Raising SIGSEGV on Android may not cause a crash.
- return unittest.skipIf(is_android,
- 'raising SIGSEGV on Android is unreliable')(test)
- @contextmanager
- def temporary_filename():
- filename = tempfile.mktemp()
- try:
- yield filename
- finally:
- os_helper.unlink(filename)
- class FaultHandlerTests(unittest.TestCase):
- def get_output(self, code, filename=None, fd=None):
- """
- Run the specified code in Python (in a new child process) and read the
- output from the standard error or from a file (if filename is set).
- Return the output lines as a list.
- Strip the reference count from the standard error for Python debug
- build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
- thread XXX".
- """
- code = dedent(code).strip()
- pass_fds = []
- if fd is not None:
- pass_fds.append(fd)
- with support.SuppressCrashReport():
- process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
- with process:
- output, stderr = process.communicate()
- exitcode = process.wait()
- output = output.decode('ascii', 'backslashreplace')
- if filename:
- self.assertEqual(output, '')
- with open(filename, "rb") as fp:
- output = fp.read()
- output = output.decode('ascii', 'backslashreplace')
- elif fd is not None:
- self.assertEqual(output, '')
- os.lseek(fd, os.SEEK_SET, 0)
- with open(fd, "rb", closefd=False) as fp:
- output = fp.read()
- output = output.decode('ascii', 'backslashreplace')
- return output.splitlines(), exitcode
- def check_error(self, code, lineno, fatal_error, *,
- filename=None, all_threads=True, other_regex=None,
- fd=None, know_current_thread=True,
- py_fatal_error=False,
- garbage_collecting=False,
- function='<module>'):
- """
- Check that the fault handler for fatal errors is enabled and check the
- traceback from the child process output.
- Raise an error if the output doesn't match the expected format.
- """
- if all_threads:
- if know_current_thread:
- header = 'Current thread 0x[0-9a-f]+'
- else:
- header = 'Thread 0x[0-9a-f]+'
- else:
- header = 'Stack'
- regex = [f'^{fatal_error}']
- if py_fatal_error:
- regex.append("Python runtime state: initialized")
- regex.append('')
- regex.append(fr'{header} \(most recent call first\):')
- if garbage_collecting:
- regex.append(' Garbage-collecting')
- regex.append(fr' File "<string>", line {lineno} in {function}')
- regex = '\n'.join(regex)
- if other_regex:
- regex = f'(?:{regex}|{other_regex})'
- # Enable MULTILINE flag
- regex = f'(?m){regex}'
- output, exitcode = self.get_output(code, filename=filename, fd=fd)
- output = '\n'.join(output)
- self.assertRegex(output, regex)
- self.assertNotEqual(exitcode, 0)
- def check_fatal_error(self, code, line_number, name_regex, func=None, **kw):
- if func:
- name_regex = '%s: %s' % (func, name_regex)
- fatal_error = 'Fatal Python error: %s' % name_regex
- self.check_error(code, line_number, fatal_error, **kw)
- def check_windows_exception(self, code, line_number, name_regex, **kw):
- fatal_error = 'Windows fatal exception: %s' % name_regex
- self.check_error(code, line_number, fatal_error, **kw)
- @unittest.skipIf(sys.platform.startswith('aix'),
- "the first page of memory is a mapped read-only on AIX")
- def test_read_null(self):
- if not MS_WINDOWS:
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._read_null()
- """,
- 3,
- # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
- '(?:Segmentation fault'
- '|Bus error'
- '|Illegal instruction)')
- else:
- self.check_windows_exception("""
- import faulthandler
- faulthandler.enable()
- faulthandler._read_null()
- """,
- 3,
- 'access violation')
- @skip_segfault_on_android
- def test_sigsegv(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._sigsegv()
- """,
- 3,
- 'Segmentation fault')
- @skip_segfault_on_android
- def test_gc(self):
- # bpo-44466: Detect if the GC is running
- self.check_fatal_error("""
- import faulthandler
- import gc
- import sys
- faulthandler.enable()
- class RefCycle:
- def __del__(self):
- faulthandler._sigsegv()
- # create a reference cycle which triggers a fatal
- # error in a destructor
- a = RefCycle()
- b = RefCycle()
- a.b = b
- b.a = a
- # Delete the objects, not the cycle
- a = None
- b = None
- # Break the reference cycle: call __del__()
- gc.collect()
- # Should not reach this line
- print("exit", file=sys.stderr)
- """,
- 9,
- 'Segmentation fault',
- function='__del__',
- garbage_collecting=True)
- def test_fatal_error_c_thread(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._fatal_error_c_thread()
- """,
- 3,
- 'in new thread',
- know_current_thread=False,
- func='faulthandler_fatal_error_thread',
- py_fatal_error=True)
- def test_sigabrt(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._sigabrt()
- """,
- 3,
- 'Aborted')
- @unittest.skipIf(sys.platform == 'win32',
- "SIGFPE cannot be caught on Windows")
- def test_sigfpe(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._sigfpe()
- """,
- 3,
- 'Floating point exception')
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
- @skip_segfault_on_android
- def test_sigbus(self):
- self.check_fatal_error("""
- import faulthandler
- import signal
- faulthandler.enable()
- signal.raise_signal(signal.SIGBUS)
- """,
- 5,
- 'Bus error')
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
- @skip_segfault_on_android
- def test_sigill(self):
- self.check_fatal_error("""
- import faulthandler
- import signal
- faulthandler.enable()
- signal.raise_signal(signal.SIGILL)
- """,
- 5,
- 'Illegal instruction')
- def check_fatal_error_func(self, release_gil):
- # Test that Py_FatalError() dumps a traceback
- with support.SuppressCrashReport():
- self.check_fatal_error(f"""
- import _testcapi
- _testcapi.fatal_error(b'xyz', {release_gil})
- """,
- 2,
- 'xyz',
- func='test_fatal_error',
- py_fatal_error=True)
- def test_fatal_error(self):
- self.check_fatal_error_func(False)
- def test_fatal_error_without_gil(self):
- self.check_fatal_error_func(True)
- @unittest.skipIf(sys.platform.startswith('openbsd'),
- "Issue #12868: sigaltstack() doesn't work on "
- "OpenBSD if Python is compiled with pthread")
- @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
- 'need faulthandler._stack_overflow()')
- def test_stack_overflow(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._stack_overflow()
- """,
- 3,
- '(?:Segmentation fault|Bus error)',
- other_regex='unable to raise a stack overflow')
- @skip_segfault_on_android
- def test_gil_released(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable()
- faulthandler._sigsegv(True)
- """,
- 3,
- 'Segmentation fault')
- @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
- "builds change crashing process output.")
- @skip_segfault_on_android
- def test_enable_file(self):
- with temporary_filename() as filename:
- self.check_fatal_error("""
- import faulthandler
- output = open({filename}, 'wb')
- faulthandler.enable(output)
- faulthandler._sigsegv()
- """.format(filename=repr(filename)),
- 4,
- 'Segmentation fault',
- filename=filename)
- @unittest.skipIf(sys.platform == "win32",
- "subprocess doesn't support pass_fds on Windows")
- @skip_if_sanitizer(memory=True, ub=True, reason="sanitizer "
- "builds change crashing process output.")
- @skip_segfault_on_android
- def test_enable_fd(self):
- with tempfile.TemporaryFile('wb+') as fp:
- fd = fp.fileno()
- self.check_fatal_error("""
- import faulthandler
- import sys
- faulthandler.enable(%s)
- faulthandler._sigsegv()
- """ % fd,
- 4,
- 'Segmentation fault',
- fd=fd)
- @skip_segfault_on_android
- def test_enable_single_thread(self):
- self.check_fatal_error("""
- import faulthandler
- faulthandler.enable(all_threads=False)
- faulthandler._sigsegv()
- """,
- 3,
- 'Segmentation fault',
- all_threads=False)
- @skip_segfault_on_android
- def test_disable(self):
- code = """
- import faulthandler
- faulthandler.enable()
- faulthandler.disable()
- faulthandler._sigsegv()
- """
- not_expected = 'Fatal Python error'
- stderr, exitcode = self.get_output(code)
- stderr = '\n'.join(stderr)
- self.assertTrue(not_expected not in stderr,
- "%r is present in %r" % (not_expected, stderr))
- self.assertNotEqual(exitcode, 0)
- @skip_segfault_on_android
- def test_dump_ext_modules(self):
- code = """
- import faulthandler
- import sys
- # Don't filter stdlib module names
- sys.stdlib_module_names = frozenset()
- faulthandler.enable()
- faulthandler._sigsegv()
- """
- stderr, exitcode = self.get_output(code)
- stderr = '\n'.join(stderr)
- match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$',
- stderr, re.MULTILINE)
- if not match:
- self.fail(f"Cannot find 'Extension modules:' in {stderr!r}")
- modules = set(match.group(1).strip().split(', '))
- for name in ('sys', 'faulthandler'):
- self.assertIn(name, modules)
- def test_is_enabled(self):
- orig_stderr = sys.stderr
- try:
- # regrtest may replace sys.stderr by io.StringIO object, but
- # faulthandler.enable() requires that sys.stderr has a fileno()
- # method
- sys.stderr = sys.__stderr__
- was_enabled = faulthandler.is_enabled()
- try:
- faulthandler.enable()
- self.assertTrue(faulthandler.is_enabled())
- faulthandler.disable()
- self.assertFalse(faulthandler.is_enabled())
- finally:
- if was_enabled:
- faulthandler.enable()
- else:
- faulthandler.disable()
- finally:
- sys.stderr = orig_stderr
- @support.requires_subprocess()
- def test_disabled_by_default(self):
- # By default, the module should be disabled
- code = "import faulthandler; print(faulthandler.is_enabled())"
- args = (sys.executable, "-E", "-c", code)
- # don't use assert_python_ok() because it always enables faulthandler
- output = subprocess.check_output(args)
- self.assertEqual(output.rstrip(), b"False")
- @support.requires_subprocess()
- def test_sys_xoptions(self):
- # Test python -X faulthandler
- code = "import faulthandler; print(faulthandler.is_enabled())"
- args = filter(None, (sys.executable,
- "-E" if sys.flags.ignore_environment else "",
- "-X", "faulthandler", "-c", code))
- env = os.environ.copy()
- env.pop("PYTHONFAULTHANDLER", None)
- # don't use assert_python_ok() because it always enables faulthandler
- output = subprocess.check_output(args, env=env)
- self.assertEqual(output.rstrip(), b"True")
- @support.requires_subprocess()
- def test_env_var(self):
- # empty env var
- code = "import faulthandler; print(faulthandler.is_enabled())"
- args = (sys.executable, "-c", code)
- env = dict(os.environ)
- env['PYTHONFAULTHANDLER'] = ''
- env['PYTHONDEVMODE'] = ''
- # don't use assert_python_ok() because it always enables faulthandler
- output = subprocess.check_output(args, env=env)
- self.assertEqual(output.rstrip(), b"False")
- # non-empty env var
- env = dict(os.environ)
- env['PYTHONFAULTHANDLER'] = '1'
- env['PYTHONDEVMODE'] = ''
- output = subprocess.check_output(args, env=env)
- self.assertEqual(output.rstrip(), b"True")
- def check_dump_traceback(self, *, filename=None, fd=None):
- """
- Explicitly call dump_traceback() function and check its output.
- Raise an error if the output doesn't match the expected format.
- """
- code = """
- import faulthandler
- filename = {filename!r}
- fd = {fd}
- def funcB():
- if filename:
- with open(filename, "wb") as fp:
- faulthandler.dump_traceback(fp, all_threads=False)
- elif fd is not None:
- faulthandler.dump_traceback(fd,
- all_threads=False)
- else:
- faulthandler.dump_traceback(all_threads=False)
- def funcA():
- funcB()
- funcA()
- """
- code = code.format(
- filename=filename,
- fd=fd,
- )
- if filename:
- lineno = 9
- elif fd is not None:
- lineno = 11
- else:
- lineno = 14
- expected = [
- 'Stack (most recent call first):',
- ' File "<string>", line %s in funcB' % lineno,
- ' File "<string>", line 17 in funcA',
- ' File "<string>", line 19 in <module>'
- ]
- trace, exitcode = self.get_output(code, filename, fd)
- self.assertEqual(trace, expected)
- self.assertEqual(exitcode, 0)
- def test_dump_traceback(self):
- self.check_dump_traceback()
- def test_dump_traceback_file(self):
- with temporary_filename() as filename:
- self.check_dump_traceback(filename=filename)
- @unittest.skipIf(sys.platform == "win32",
- "subprocess doesn't support pass_fds on Windows")
- def test_dump_traceback_fd(self):
- with tempfile.TemporaryFile('wb+') as fp:
- self.check_dump_traceback(fd=fp.fileno())
- def test_truncate(self):
- maxlen = 500
- func_name = 'x' * (maxlen + 50)
- truncated = 'x' * maxlen + '...'
- code = """
- import faulthandler
- def {func_name}():
- faulthandler.dump_traceback(all_threads=False)
- {func_name}()
- """
- code = code.format(
- func_name=func_name,
- )
- expected = [
- 'Stack (most recent call first):',
- ' File "<string>", line 4 in %s' % truncated,
- ' File "<string>", line 6 in <module>'
- ]
- trace, exitcode = self.get_output(code)
- self.assertEqual(trace, expected)
- self.assertEqual(exitcode, 0)
- def check_dump_traceback_threads(self, filename):
- """
- Call explicitly dump_traceback(all_threads=True) and check the output.
- Raise an error if the output doesn't match the expected format.
- """
- code = """
- import faulthandler
- from threading import Thread, Event
- import time
- def dump():
- if {filename}:
- with open({filename}, "wb") as fp:
- faulthandler.dump_traceback(fp, all_threads=True)
- else:
- faulthandler.dump_traceback(all_threads=True)
- class Waiter(Thread):
- # avoid blocking if the main thread raises an exception.
- daemon = True
- def __init__(self):
- Thread.__init__(self)
- self.running = Event()
- self.stop = Event()
- def run(self):
- self.running.set()
- self.stop.wait()
- waiter = Waiter()
- waiter.start()
- waiter.running.wait()
- dump()
- waiter.stop.set()
- waiter.join()
- """
- code = code.format(filename=repr(filename))
- output, exitcode = self.get_output(code, filename)
- output = '\n'.join(output)
- if filename:
- lineno = 8
- else:
- lineno = 10
- regex = r"""
- ^Thread 0x[0-9a-f]+ \(most recent call first\):
- (?: File ".*threading.py", line [0-9]+ in [_a-z]+
- ){{1,3}} File "<string>", line 23 in run
- File ".*threading.py", line [0-9]+ in _bootstrap_inner
- File ".*threading.py", line [0-9]+ in _bootstrap
- Current thread 0x[0-9a-f]+ \(most recent call first\):
- File "<string>", line {lineno} in dump
- File "<string>", line 28 in <module>$
- """
- regex = dedent(regex.format(lineno=lineno)).strip()
- self.assertRegex(output, regex)
- self.assertEqual(exitcode, 0)
- def test_dump_traceback_threads(self):
- self.check_dump_traceback_threads(None)
- def test_dump_traceback_threads_file(self):
- with temporary_filename() as filename:
- self.check_dump_traceback_threads(filename)
- def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
- *, filename=None, fd=None):
- """
- Check how many times the traceback is written in timeout x 2.5 seconds,
- or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
- on repeat and cancel options.
- Raise an error if the output doesn't match the expect format.
- """
- timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
- code = """
- import faulthandler
- import time
- import sys
- timeout = {timeout}
- repeat = {repeat}
- cancel = {cancel}
- loops = {loops}
- filename = {filename!r}
- fd = {fd}
- def func(timeout, repeat, cancel, file, loops):
- for loop in range(loops):
- faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
- if cancel:
- faulthandler.cancel_dump_traceback_later()
- time.sleep(timeout * 5)
- faulthandler.cancel_dump_traceback_later()
- if filename:
- file = open(filename, "wb")
- elif fd is not None:
- file = sys.stderr.fileno()
- else:
- file = None
- func(timeout, repeat, cancel, file, loops)
- if filename:
- file.close()
- """
- code = code.format(
- timeout=TIMEOUT,
- repeat=repeat,
- cancel=cancel,
- loops=loops,
- filename=filename,
- fd=fd,
- )
- trace, exitcode = self.get_output(code, filename)
- trace = '\n'.join(trace)
- if not cancel:
- count = loops
- if repeat:
- count *= 2
- header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
- regex = expected_traceback(17, 26, header, min_count=count)
- self.assertRegex(trace, regex)
- else:
- self.assertEqual(trace, '')
- self.assertEqual(exitcode, 0)
- def test_dump_traceback_later(self):
- self.check_dump_traceback_later()
- def test_dump_traceback_later_repeat(self):
- self.check_dump_traceback_later(repeat=True)
- def test_dump_traceback_later_cancel(self):
- self.check_dump_traceback_later(cancel=True)
- def test_dump_traceback_later_file(self):
- with temporary_filename() as filename:
- self.check_dump_traceback_later(filename=filename)
- @unittest.skipIf(sys.platform == "win32",
- "subprocess doesn't support pass_fds on Windows")
- def test_dump_traceback_later_fd(self):
- with tempfile.TemporaryFile('wb+') as fp:
- self.check_dump_traceback_later(fd=fp.fileno())
- def test_dump_traceback_later_twice(self):
- self.check_dump_traceback_later(loops=2)
- @unittest.skipIf(not hasattr(faulthandler, "register"),
- "need faulthandler.register")
- def check_register(self, filename=False, all_threads=False,
- unregister=False, chain=False, fd=None):
- """
- Register a handler displaying the traceback on a user signal. Raise the
- signal and check the written traceback.
- If chain is True, check that the previous signal handler is called.
- Raise an error if the output doesn't match the expected format.
- """
- signum = signal.SIGUSR1
- code = """
- import faulthandler
- import os
- import signal
- import sys
- all_threads = {all_threads}
- signum = {signum:d}
- unregister = {unregister}
- chain = {chain}
- filename = {filename!r}
- fd = {fd}
- def func(signum):
- os.kill(os.getpid(), signum)
- def handler(signum, frame):
- handler.called = True
- handler.called = False
- if filename:
- file = open(filename, "wb")
- elif fd is not None:
- file = sys.stderr.fileno()
- else:
- file = None
- if chain:
- signal.signal(signum, handler)
- faulthandler.register(signum, file=file,
- all_threads=all_threads, chain={chain})
- if unregister:
- faulthandler.unregister(signum)
- func(signum)
- if chain and not handler.called:
- if file is not None:
- output = file
- else:
- output = sys.stderr
- print("Error: signal handler not called!", file=output)
- exitcode = 1
- else:
- exitcode = 0
- if filename:
- file.close()
- sys.exit(exitcode)
- """
- code = code.format(
- all_threads=all_threads,
- signum=signum,
- unregister=unregister,
- chain=chain,
- filename=filename,
- fd=fd,
- )
- trace, exitcode = self.get_output(code, filename)
- trace = '\n'.join(trace)
- if not unregister:
- if all_threads:
- regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
- else:
- regex = r'Stack \(most recent call first\):\n'
- regex = expected_traceback(14, 32, regex)
- self.assertRegex(trace, regex)
- else:
- self.assertEqual(trace, '')
- if unregister:
- self.assertNotEqual(exitcode, 0)
- else:
- self.assertEqual(exitcode, 0)
- def test_register(self):
- self.check_register()
- def test_unregister(self):
- self.check_register(unregister=True)
- def test_register_file(self):
- with temporary_filename() as filename:
- self.check_register(filename=filename)
- @unittest.skipIf(sys.platform == "win32",
- "subprocess doesn't support pass_fds on Windows")
- def test_register_fd(self):
- with tempfile.TemporaryFile('wb+') as fp:
- self.check_register(fd=fp.fileno())
- def test_register_threads(self):
- self.check_register(all_threads=True)
- def test_register_chain(self):
- self.check_register(chain=True)
- @contextmanager
- def check_stderr_none(self):
- stderr = sys.stderr
- try:
- sys.stderr = None
- with self.assertRaises(RuntimeError) as cm:
- yield
- self.assertEqual(str(cm.exception), "sys.stderr is None")
- finally:
- sys.stderr = stderr
- def test_stderr_None(self):
- # Issue #21497: provide a helpful error if sys.stderr is None,
- # instead of just an attribute error: "None has no attribute fileno".
- with self.check_stderr_none():
- faulthandler.enable()
- with self.check_stderr_none():
- faulthandler.dump_traceback()
- with self.check_stderr_none():
- faulthandler.dump_traceback_later(1e-3)
- if hasattr(faulthandler, "register"):
- with self.check_stderr_none():
- faulthandler.register(signal.SIGUSR1)
- @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
- def test_raise_exception(self):
- for exc, name in (
- ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
- ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
- ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
- ):
- self.check_windows_exception(f"""
- import faulthandler
- faulthandler.enable()
- faulthandler._raise_exception(faulthandler._{exc})
- """,
- 3,
- name)
- @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
- def test_ignore_exception(self):
- for exc_code in (
- 0xE06D7363, # MSC exception ("Emsc")
- 0xE0434352, # COM Callable Runtime exception ("ECCR")
- ):
- code = f"""
- import faulthandler
- faulthandler.enable()
- faulthandler._raise_exception({exc_code})
- """
- code = dedent(code)
- output, exitcode = self.get_output(code)
- self.assertEqual(output, [])
- self.assertEqual(exitcode, exc_code)
- @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
- def test_raise_nonfatal_exception(self):
- # These exceptions are not strictly errors. Letting
- # faulthandler display the traceback when they are
- # raised is likely to result in noise. However, they
- # may still terminate the process if there is no
- # handler installed for them (which there typically
- # is, e.g. for debug messages).
- for exc in (
- 0x00000000,
- 0x34567890,
- 0x40000000,
- 0x40001000,
- 0x70000000,
- 0x7FFFFFFF,
- ):
- output, exitcode = self.get_output(f"""
- import faulthandler
- faulthandler.enable()
- faulthandler._raise_exception(0x{exc:x})
- """
- )
- self.assertEqual(output, [])
- # On Windows older than 7 SP1, the actual exception code has
- # bit 29 cleared.
- self.assertIn(exitcode,
- (exc, exc & ~0x10000000))
- @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
- def test_disable_windows_exc_handler(self):
- code = dedent("""
- import faulthandler
- faulthandler.enable()
- faulthandler.disable()
- code = faulthandler._EXCEPTION_ACCESS_VIOLATION
- faulthandler._raise_exception(code)
- """)
- output, exitcode = self.get_output(code)
- self.assertEqual(output, [])
- self.assertEqual(exitcode, 0xC0000005)
- def test_cancel_later_without_dump_traceback_later(self):
- # bpo-37933: Calling cancel_dump_traceback_later()
- # without dump_traceback_later() must not segfault.
- code = dedent("""
- import faulthandler
- faulthandler.cancel_dump_traceback_later()
- """)
- output, exitcode = self.get_output(code)
- self.assertEqual(output, [])
- self.assertEqual(exitcode, 0)
- if __name__ == "__main__":
- unittest.main()
|