| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- import faulthandler
- import functools
- import gc
- import importlib
- import io
- import os
- import sys
- import time
- import traceback
- import unittest
- from test import support
- from test.support import os_helper
- from test.support import threading_helper
- from test.libregrtest.cmdline import Namespace
- from test.libregrtest.save_env import saved_test_environment
- from test.libregrtest.utils import clear_caches, format_duration, print_warning
- class TestResult:
- def __init__(
- self,
- name: str,
- duration_sec: float = 0.0,
- xml_data: list[str] | None = None,
- ) -> None:
- self.name = name
- self.duration_sec = duration_sec
- self.xml_data = xml_data
- def __str__(self) -> str:
- return f"{self.name} finished"
- class Passed(TestResult):
- def __str__(self) -> str:
- return f"{self.name} passed"
- class Failed(TestResult):
- def __init__(
- self,
- name: str,
- duration_sec: float = 0.0,
- xml_data: list[str] | None = None,
- errors: list[tuple[str, str]] | None = None,
- failures: list[tuple[str, str]] | None = None,
- ) -> None:
- super().__init__(name, duration_sec=duration_sec, xml_data=xml_data)
- self.errors = errors
- self.failures = failures
- def __str__(self) -> str:
- if self.errors and self.failures:
- le = len(self.errors)
- lf = len(self.failures)
- error_s = "error" + ("s" if le > 1 else "")
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.name} failed ({le} {error_s}, {lf} {failure_s})"
- if self.errors:
- le = len(self.errors)
- error_s = "error" + ("s" if le > 1 else "")
- return f"{self.name} failed ({le} {error_s})"
- if self.failures:
- lf = len(self.failures)
- failure_s = "failure" + ("s" if lf > 1 else "")
- return f"{self.name} failed ({lf} {failure_s})"
- return f"{self.name} failed"
- class UncaughtException(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (uncaught exception)"
- class EnvChanged(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (env changed)"
- class RefLeak(Failed):
- def __str__(self) -> str:
- return f"{self.name} failed (reference leak)"
- class Skipped(TestResult):
- def __str__(self) -> str:
- return f"{self.name} skipped"
- class ResourceDenied(Skipped):
- def __str__(self) -> str:
- return f"{self.name} skipped (resource denied)"
- class Interrupted(TestResult):
- def __str__(self) -> str:
- return f"{self.name} interrupted"
- class ChildError(Failed):
- def __str__(self) -> str:
- return f"{self.name} crashed"
- class DidNotRun(TestResult):
- def __str__(self) -> str:
- return f"{self.name} ran no tests"
- class Timeout(Failed):
- def __str__(self) -> str:
- return f"{self.name} timed out ({format_duration(self.duration_sec)})"
- # Minimum duration of a test to display its duration or to mention that
- # the test is running in background
- PROGRESS_MIN_TIME = 30.0 # seconds
- # small set of tests to determine if we have a basically functioning interpreter
- # (i.e. if any of these fail, then anything else is likely to follow)
- STDTESTS = [
- 'test_grammar',
- 'test_opcodes',
- 'test_dict',
- 'test_builtin',
- 'test_exceptions',
- 'test_types',
- 'test_unittest',
- 'test_doctest',
- 'test_doctest2',
- 'test_support'
- ]
- # set of tests that we don't want to be executed when using regrtest
- NOTTESTS = set()
- # Storage of uncollectable objects
- FOUND_GARBAGE = []
- def is_failed(result: TestResult, ns: Namespace) -> bool:
- if isinstance(result, EnvChanged):
- return ns.fail_env_changed
- return isinstance(result, Failed)
- def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
- def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
- """Return a list of all applicable test modules."""
- testdir = findtestdir(testdir)
- names = os.listdir(testdir)
- tests = []
- others = set(stdtests) | nottests
- for name in names:
- mod, ext = os.path.splitext(name)
- if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
- tests.append(mod)
- return stdtests + sorted(tests)
- def get_abs_module(ns: Namespace, test_name: str) -> str:
- if test_name.startswith('test.') or ns.testdir:
- return test_name
- else:
- # Import it from the test package
- return 'test.' + test_name
- def _runtest(ns: Namespace, test_name: str) -> TestResult:
- # Handle faulthandler timeout, capture stdout+stderr, XML serialization
- # and measure time.
- output_on_failure = ns.verbose3
- use_timeout = (
- ns.timeout is not None and threading_helper.can_start_thread
- )
- if use_timeout:
- faulthandler.dump_traceback_later(ns.timeout, exit=True)
- start_time = time.perf_counter()
- try:
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.junit_xml_list = xml_list = [] if ns.xmlpath else None
- if ns.failfast:
- support.failfast = True
- if output_on_failure:
- support.verbose = True
- stream = io.StringIO()
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- print_warning = support.print_warning
- orig_print_warnings_stderr = print_warning.orig_stderr
- output = None
- try:
- sys.stdout = stream
- sys.stderr = stream
- # print_warning() writes into the temporary stream to preserve
- # messages order. If support.environment_altered becomes true,
- # warnings will be written to sys.stderr below.
- print_warning.orig_stderr = stream
- result = _runtest_inner(ns, test_name,
- display_failure=False)
- if not isinstance(result, Passed):
- output = stream.getvalue()
- finally:
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- print_warning.orig_stderr = orig_print_warnings_stderr
- if output is not None:
- sys.stderr.write(output)
- sys.stderr.flush()
- else:
- # Tell tests to be moderately quiet
- support.verbose = ns.verbose
- result = _runtest_inner(ns, test_name,
- display_failure=not ns.verbose)
- if xml_list:
- import xml.etree.ElementTree as ET
- result.xml_data = [
- ET.tostring(x).decode('us-ascii')
- for x in xml_list
- ]
- result.duration_sec = time.perf_counter() - start_time
- return result
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
- support.junit_xml_list = None
- def runtest(ns: Namespace, test_name: str) -> TestResult:
- """Run a single test.
- ns -- regrtest namespace of options
- test_name -- the name of the test
- Returns a TestResult sub-class depending on the kind of result received.
- If ns.xmlpath is not None, xml_data is a list containing each
- generated testsuite element.
- """
- try:
- return _runtest(ns, test_name)
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- return Failed(test_name)
- def _test_module(the_module):
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- support.run_unittest(tests)
- def save_env(ns: Namespace, test_name: str):
- return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo)
- def _runtest_inner2(ns: Namespace, test_name: str) -> bool:
- # Load the test function, run the test function, handle huntrleaks
- # to detect leaks.
- abstest = get_abs_module(ns, test_name)
- # remove the module from sys.module to reload it if it was already imported
- try:
- del sys.modules[abstest]
- except KeyError:
- pass
- the_module = importlib.import_module(abstest)
- if ns.huntrleaks:
- from test.libregrtest.refleak import dash_R
- # If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- test_runner = functools.partial(_test_module, the_module)
- try:
- with save_env(ns, test_name):
- if ns.huntrleaks:
- # Return True if the test leaked references
- refleak = dash_R(ns, test_name, test_runner)
- else:
- test_runner()
- refleak = False
- finally:
- # First kill any dangling references to open files etc.
- # This can also issue some ResourceWarnings which would otherwise get
- # triggered during the following test run, and possibly produce
- # failures.
- support.gc_collect()
- cleanup_test_droppings(test_name, ns.verbose)
- if gc.garbage:
- support.environment_altered = True
- print_warning(f"{test_name} created {len(gc.garbage)} "
- f"uncollectable object(s).")
- # move the uncollectable objects somewhere,
- # so we don't see them again
- FOUND_GARBAGE.extend(gc.garbage)
- gc.garbage.clear()
- support.reap_children()
- return refleak
- def _runtest_inner(
- ns: Namespace, test_name: str, display_failure: bool = True
- ) -> TestResult:
- # Detect environment changes, handle exceptions.
- # Reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
- if ns.pgo:
- display_failure = False
- try:
- clear_caches()
- support.gc_collect()
- with save_env(ns, test_name):
- refleak = _runtest_inner2(ns, test_name)
- except support.ResourceDenied as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- return ResourceDenied(test_name)
- except unittest.SkipTest as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- return Skipped(test_name)
- except support.TestFailedWithDetails as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- return Failed(test_name, errors=exc.errors, failures=exc.failures)
- except support.TestFailed as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- return Failed(test_name)
- except support.TestDidNotRun:
- return DidNotRun(test_name)
- except KeyboardInterrupt:
- print()
- return Interrupted(test_name)
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- return UncaughtException(test_name)
- if refleak:
- return RefLeak(test_name)
- if support.environment_altered:
- return EnvChanged(test_name)
- return Passed(test_name)
- def cleanup_test_droppings(test_name: str, verbose: int) -> None:
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (os_helper.TESTFN,):
- if not os.path.exists(name):
- continue
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
- if verbose:
- print_warning(f"{test_name} left behind {kind} {name!r}")
- support.environment_altered = True
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
|