| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- import faulthandler
- import json
- import os
- import queue
- import signal
- import subprocess
- import sys
- import tempfile
- import threading
- import time
- import traceback
- from typing import NamedTuple, NoReturn, Literal, Any, TextIO
- from test import support
- from test.support import os_helper
- from test.libregrtest.cmdline import Namespace
- from test.libregrtest.main import Regrtest
- from test.libregrtest.runtest import (
- runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME)
- from test.libregrtest.setup import setup_tests
- from test.libregrtest.utils import format_duration, print_warning
- if sys.platform == 'win32':
- import locale
- # Display the running tests if nothing happened last N seconds
- PROGRESS_UPDATE = 30.0 # seconds
- assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
- # Kill the main process after 5 minutes. It is supposed to write an update
- # every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
- # buildbot workers.
- MAIN_PROCESS_TIMEOUT = 5 * 60.0
- assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
- # Time to wait until a worker completes: should be immediate
- JOIN_TIMEOUT = 30.0 # seconds
- USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
- def must_stop(result: TestResult, ns: Namespace) -> bool:
- if isinstance(result, Interrupted):
- return True
- if ns.failfast and is_failed(result, ns):
- return True
- return False
- def parse_worker_args(worker_args) -> tuple[Namespace, str]:
- ns_dict, test_name = json.loads(worker_args)
- ns = Namespace(**ns_dict)
- return (ns, test_name)
- def run_test_in_subprocess(testname: str, ns: Namespace, stdout_fh: TextIO) -> subprocess.Popen:
- ns_dict = vars(ns)
- worker_args = (ns_dict, testname)
- worker_args = json.dumps(worker_args)
- if ns.python is not None:
- executable = ns.python
- else:
- executable = [sys.executable]
- cmd = [*executable, *support.args_from_interpreter_flags(),
- '-u', # Unbuffered stdout and stderr
- '-m', 'test.regrtest',
- '--worker-args', worker_args]
- # Running the child from the same working directory as regrtest's original
- # invocation ensures that TEMPDIR for the child is the same when
- # sysconfig.is_python_build() is true. See issue 15300.
- kw = dict(
- stdout=stdout_fh,
- # bpo-45410: Write stderr into stdout to keep messages order
- stderr=stdout_fh,
- text=True,
- close_fds=(os.name != 'nt'),
- cwd=os_helper.SAVEDCWD,
- )
- if USE_PROCESS_GROUP:
- kw['start_new_session'] = True
- return subprocess.Popen(cmd, **kw)
- def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
- setup_tests(ns)
- result = runtest(ns, test_name)
- print() # Force a newline (just in case)
- # Serialize TestResult as dict in JSON
- print(json.dumps(result, cls=EncodeTestResult), flush=True)
- sys.exit(0)
- # We do not use a generator so multiple threads can call next().
- class MultiprocessIterator:
- """A thread-safe iterator over tests for multiprocess mode."""
- def __init__(self, tests_iter):
- self.lock = threading.Lock()
- self.tests_iter = tests_iter
- def __iter__(self):
- return self
- def __next__(self):
- with self.lock:
- if self.tests_iter is None:
- raise StopIteration
- return next(self.tests_iter)
- def stop(self):
- with self.lock:
- self.tests_iter = None
- class MultiprocessResult(NamedTuple):
- result: TestResult
- # bpo-45410: stderr is written into stdout to keep messages order
- stdout: str
- error_msg: str
- ExcStr = str
- QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
- class ExitThread(Exception):
- pass
- class TestWorkerProcess(threading.Thread):
- def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
- super().__init__()
- self.worker_id = worker_id
- self.pending = runner.pending
- self.output = runner.output
- self.ns = runner.ns
- self.timeout = runner.worker_timeout
- self.regrtest = runner.regrtest
- self.current_test_name = None
- self.start_time = None
- self._popen = None
- self._killed = False
- self._stopped = False
- def __repr__(self) -> str:
- info = [f'TestWorkerProcess #{self.worker_id}']
- if self.is_alive():
- info.append("running")
- else:
- info.append('stopped')
- test = self.current_test_name
- if test:
- info.append(f'test={test}')
- popen = self._popen
- if popen is not None:
- dt = time.monotonic() - self.start_time
- info.extend((f'pid={self._popen.pid}',
- f'time={format_duration(dt)}'))
- return '<%s>' % ' '.join(info)
- def _kill(self) -> None:
- popen = self._popen
- if popen is None:
- return
- if self._killed:
- return
- self._killed = True
- if USE_PROCESS_GROUP:
- what = f"{self} process group"
- else:
- what = f"{self}"
- print(f"Kill {what}", file=sys.stderr, flush=True)
- try:
- if USE_PROCESS_GROUP:
- os.killpg(popen.pid, signal.SIGKILL)
- else:
- popen.kill()
- except ProcessLookupError:
- # popen.kill(): the process completed, the TestWorkerProcess thread
- # read its exit status, but Popen.send_signal() read the returncode
- # just before Popen.wait() set returncode.
- pass
- except OSError as exc:
- print_warning(f"Failed to kill {what}: {exc!r}")
- def stop(self) -> None:
- # Method called from a different thread to stop this thread
- self._stopped = True
- self._kill()
- def mp_result_error(
- self,
- test_result: TestResult,
- stdout: str = '',
- err_msg=None
- ) -> MultiprocessResult:
- test_result.duration_sec = time.monotonic() - self.start_time
- return MultiprocessResult(test_result, stdout, err_msg)
- def _run_process(self, test_name: str, stdout_fh: TextIO) -> int:
- self.start_time = time.monotonic()
- self.current_test_name = test_name
- try:
- popen = run_test_in_subprocess(test_name, self.ns, stdout_fh)
- self._killed = False
- self._popen = popen
- except:
- self.current_test_name = None
- raise
- try:
- if self._stopped:
- # If kill() has been called before self._popen is set,
- # self._popen is still running. Call again kill()
- # to ensure that the process is killed.
- self._kill()
- raise ExitThread
- try:
- # gh-94026: stdout+stderr are written to tempfile
- retcode = popen.wait(timeout=self.timeout)
- assert retcode is not None
- return retcode
- except subprocess.TimeoutExpired:
- if self._stopped:
- # kill() has been called: communicate() fails on reading
- # closed stdout
- raise ExitThread
- # On timeout, kill the process
- self._kill()
- # None means TIMEOUT for the caller
- retcode = None
- # bpo-38207: Don't attempt to call communicate() again: on it
- # can hang until all child processes using stdout
- # pipes completes.
- except OSError:
- if self._stopped:
- # kill() has been called: communicate() fails
- # on reading closed stdout
- raise ExitThread
- raise
- except:
- self._kill()
- raise
- finally:
- self._wait_completed()
- self._popen = None
- self.current_test_name = None
- def _runtest(self, test_name: str) -> MultiprocessResult:
- if sys.platform == 'win32':
- # gh-95027: When stdout is not a TTY, Python uses the ANSI code
- # page for the sys.stdout encoding. If the main process runs in a
- # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
- encoding = locale.getencoding()
- else:
- encoding = sys.stdout.encoding
- # gh-94026: Write stdout+stderr to a tempfile as workaround for
- # non-blocking pipes on Emscripten with NodeJS.
- with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
- # gh-93353: Check for leaked temporary files in the parent process,
- # since the deletion of temporary files can happen late during
- # Python finalization: too late for libregrtest.
- retcode = self._run_process(test_name, stdout_fh)
- stdout_fh.seek(0)
- stdout = stdout_fh.read().strip()
- if retcode is None:
- return self.mp_result_error(Timeout(test_name), stdout)
- err_msg = None
- if retcode != 0:
- err_msg = "Exit code %s" % retcode
- else:
- stdout, _, result = stdout.rpartition("\n")
- stdout = stdout.rstrip()
- if not result:
- err_msg = "Failed to parse worker stdout"
- else:
- try:
- # deserialize run_tests_worker() output
- result = json.loads(result, object_hook=decode_test_result)
- except Exception as exc:
- err_msg = "Failed to parse worker JSON: %s" % exc
- if err_msg is not None:
- return self.mp_result_error(ChildError(test_name), stdout, err_msg)
- return MultiprocessResult(result, stdout, err_msg)
- def run(self) -> None:
- while not self._stopped:
- try:
- try:
- test_name = next(self.pending)
- except StopIteration:
- break
- mp_result = self._runtest(test_name)
- self.output.put((False, mp_result))
- if must_stop(mp_result.result, self.ns):
- break
- except ExitThread:
- break
- except BaseException:
- self.output.put((True, traceback.format_exc()))
- break
- def _wait_completed(self) -> None:
- popen = self._popen
- try:
- popen.wait(JOIN_TIMEOUT)
- except (subprocess.TimeoutExpired, OSError) as exc:
- print_warning(f"Failed to wait for {self} completion "
- f"(timeout={format_duration(JOIN_TIMEOUT)}): "
- f"{exc!r}")
- def wait_stopped(self, start_time: float) -> None:
- # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
- # which killed the process. Sometimes, killing the process from the
- # main thread does not interrupt popen.communicate() in
- # TestWorkerProcess thread. This loop with a timeout is a workaround
- # for that.
- #
- # Moreover, if this method fails to join the thread, it is likely
- # that Python will hang at exit while calling threading._shutdown()
- # which tries again to join the blocked thread. Regrtest.main()
- # uses EXIT_TIMEOUT to workaround this second bug.
- while True:
- # Write a message every second
- self.join(1.0)
- if not self.is_alive():
- break
- dt = time.monotonic() - start_time
- self.regrtest.log(f"Waiting for {self} thread "
- f"for {format_duration(dt)}")
- if dt > JOIN_TIMEOUT:
- print_warning(f"Failed to join {self} in {format_duration(dt)}")
- break
- def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
- running = []
- for worker in workers:
- current_test_name = worker.current_test_name
- if not current_test_name:
- continue
- dt = time.monotonic() - worker.start_time
- if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test_name, format_duration(dt))
- running.append(text)
- return running
- class MultiprocessTestRunner:
- def __init__(self, regrtest: Regrtest) -> None:
- self.regrtest = regrtest
- self.log = self.regrtest.log
- self.ns = regrtest.ns
- self.output: queue.Queue[QueueOutput] = queue.Queue()
- self.pending = MultiprocessIterator(self.regrtest.tests)
- if self.ns.timeout is not None:
- # Rely on faulthandler to kill a worker process. This timouet is
- # when faulthandler fails to kill a worker process. Give a maximum
- # of 5 minutes to faulthandler to kill the worker.
- self.worker_timeout = min(self.ns.timeout * 1.5,
- self.ns.timeout + 5 * 60)
- else:
- self.worker_timeout = None
- self.workers = None
- def start_workers(self) -> None:
- self.workers = [TestWorkerProcess(index, self)
- for index in range(1, self.ns.use_mp + 1)]
- msg = f"Run tests in parallel using {len(self.workers)} child processes"
- if self.ns.timeout:
- msg += (" (timeout: %s, worker timeout: %s)"
- % (format_duration(self.ns.timeout),
- format_duration(self.worker_timeout)))
- self.log(msg)
- for worker in self.workers:
- worker.start()
- def stop_workers(self) -> None:
- start_time = time.monotonic()
- for worker in self.workers:
- worker.stop()
- for worker in self.workers:
- worker.wait_stopped(start_time)
- def _get_result(self) -> QueueOutput | None:
- use_faulthandler = (self.ns.timeout is not None)
- timeout = PROGRESS_UPDATE
- # bpo-46205: check the status of workers every iteration to avoid
- # waiting forever on an empty queue.
- while any(worker.is_alive() for worker in self.workers):
- if use_faulthandler:
- faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
- exit=True)
- # wait for a thread
- try:
- return self.output.get(timeout=timeout)
- except queue.Empty:
- pass
- # display progress
- running = get_running(self.workers)
- if running and not self.ns.pgo:
- self.log('running: %s' % ', '.join(running))
- # all worker threads are done: consume pending results
- try:
- return self.output.get(timeout=0)
- except queue.Empty:
- return None
- def display_result(self, mp_result: MultiprocessResult) -> None:
- result = mp_result.result
- text = str(result)
- if mp_result.error_msg is not None:
- # CHILD_ERROR
- text += ' (%s)' % mp_result.error_msg
- elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
- text += ' (%s)' % format_duration(result.duration_sec)
- running = get_running(self.workers)
- if running and not self.ns.pgo:
- text += ' -- running: %s' % ', '.join(running)
- self.regrtest.display_progress(self.test_index, text)
- def _process_result(self, item: QueueOutput) -> bool:
- """Returns True if test runner must stop."""
- if item[0]:
- # Thread got an exception
- format_exc = item[1]
- print_warning(f"regrtest worker thread failed: {format_exc}")
- return True
- self.test_index += 1
- mp_result = item[1]
- self.regrtest.accumulate_result(mp_result.result)
- self.display_result(mp_result)
- if mp_result.stdout:
- print(mp_result.stdout, flush=True)
- if must_stop(mp_result.result, self.ns):
- return True
- return False
- def run_tests(self) -> None:
- self.start_workers()
- self.test_index = 0
- try:
- while True:
- item = self._get_result()
- if item is None:
- break
- stop = self._process_result(item)
- if stop:
- break
- except KeyboardInterrupt:
- print()
- self.regrtest.interrupted = True
- finally:
- if self.ns.timeout is not None:
- faulthandler.cancel_dump_traceback_later()
- # Always ensure that all worker processes are no longer
- # worker when we exit this function
- self.pending.stop()
- self.stop_workers()
- def run_tests_multiprocess(regrtest: Regrtest) -> None:
- MultiprocessTestRunner(regrtest).run_tests()
- class EncodeTestResult(json.JSONEncoder):
- """Encode a TestResult (sub)class object into a JSON dict."""
- def default(self, o: Any) -> dict[str, Any]:
- if isinstance(o, TestResult):
- result = vars(o)
- result["__test_result__"] = o.__class__.__name__
- return result
- return super().default(o)
- def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
- """Decode a TestResult (sub)class object from a JSON dict."""
- if "__test_result__" not in d:
- return d
- cls_name = d.pop("__test_result__")
- for cls in get_all_test_result_classes():
- if cls.__name__ == cls_name:
- return cls(**d)
- def get_all_test_result_classes() -> set[type[TestResult]]:
- prev_count = 0
- classes = {TestResult}
- while len(classes) > prev_count:
- prev_count = len(classes)
- to_add = []
- for cls in classes:
- to_add.extend(cls.__subclasses__())
- classes.update(to_add)
- return classes
|