runtest_mp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. import faulthandler
  2. import json
  3. import os
  4. import queue
  5. import signal
  6. import subprocess
  7. import sys
  8. import tempfile
  9. import threading
  10. import time
  11. import traceback
  12. from typing import NamedTuple, NoReturn, Literal, Any, TextIO
  13. from test import support
  14. from test.support import os_helper
  15. from test.libregrtest.cmdline import Namespace
  16. from test.libregrtest.main import Regrtest
  17. from test.libregrtest.runtest import (
  18. runtest, is_failed, TestResult, Interrupted, Timeout, ChildError, PROGRESS_MIN_TIME)
  19. from test.libregrtest.setup import setup_tests
  20. from test.libregrtest.utils import format_duration, print_warning
  21. if sys.platform == 'win32':
  22. import locale
  23. # Display the running tests if nothing happened last N seconds
  24. PROGRESS_UPDATE = 30.0 # seconds
  25. assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
  26. # Kill the main process after 5 minutes. It is supposed to write an update
  27. # every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
  28. # buildbot workers.
  29. MAIN_PROCESS_TIMEOUT = 5 * 60.0
  30. assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
  31. # Time to wait until a worker completes: should be immediate
  32. JOIN_TIMEOUT = 30.0 # seconds
  33. USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
  34. def must_stop(result: TestResult, ns: Namespace) -> bool:
  35. if isinstance(result, Interrupted):
  36. return True
  37. if ns.failfast and is_failed(result, ns):
  38. return True
  39. return False
  40. def parse_worker_args(worker_args) -> tuple[Namespace, str]:
  41. ns_dict, test_name = json.loads(worker_args)
  42. ns = Namespace(**ns_dict)
  43. return (ns, test_name)
  44. def run_test_in_subprocess(testname: str, ns: Namespace, stdout_fh: TextIO) -> subprocess.Popen:
  45. ns_dict = vars(ns)
  46. worker_args = (ns_dict, testname)
  47. worker_args = json.dumps(worker_args)
  48. if ns.python is not None:
  49. executable = ns.python
  50. else:
  51. executable = [sys.executable]
  52. cmd = [*executable, *support.args_from_interpreter_flags(),
  53. '-u', # Unbuffered stdout and stderr
  54. '-m', 'test.regrtest',
  55. '--worker-args', worker_args]
  56. # Running the child from the same working directory as regrtest's original
  57. # invocation ensures that TEMPDIR for the child is the same when
  58. # sysconfig.is_python_build() is true. See issue 15300.
  59. kw = dict(
  60. stdout=stdout_fh,
  61. # bpo-45410: Write stderr into stdout to keep messages order
  62. stderr=stdout_fh,
  63. text=True,
  64. close_fds=(os.name != 'nt'),
  65. cwd=os_helper.SAVEDCWD,
  66. )
  67. if USE_PROCESS_GROUP:
  68. kw['start_new_session'] = True
  69. return subprocess.Popen(cmd, **kw)
  70. def run_tests_worker(ns: Namespace, test_name: str) -> NoReturn:
  71. setup_tests(ns)
  72. result = runtest(ns, test_name)
  73. print() # Force a newline (just in case)
  74. # Serialize TestResult as dict in JSON
  75. print(json.dumps(result, cls=EncodeTestResult), flush=True)
  76. sys.exit(0)
  77. # We do not use a generator so multiple threads can call next().
  78. class MultiprocessIterator:
  79. """A thread-safe iterator over tests for multiprocess mode."""
  80. def __init__(self, tests_iter):
  81. self.lock = threading.Lock()
  82. self.tests_iter = tests_iter
  83. def __iter__(self):
  84. return self
  85. def __next__(self):
  86. with self.lock:
  87. if self.tests_iter is None:
  88. raise StopIteration
  89. return next(self.tests_iter)
  90. def stop(self):
  91. with self.lock:
  92. self.tests_iter = None
  93. class MultiprocessResult(NamedTuple):
  94. result: TestResult
  95. # bpo-45410: stderr is written into stdout to keep messages order
  96. stdout: str
  97. error_msg: str
  98. ExcStr = str
  99. QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
  100. class ExitThread(Exception):
  101. pass
  102. class TestWorkerProcess(threading.Thread):
  103. def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
  104. super().__init__()
  105. self.worker_id = worker_id
  106. self.pending = runner.pending
  107. self.output = runner.output
  108. self.ns = runner.ns
  109. self.timeout = runner.worker_timeout
  110. self.regrtest = runner.regrtest
  111. self.current_test_name = None
  112. self.start_time = None
  113. self._popen = None
  114. self._killed = False
  115. self._stopped = False
  116. def __repr__(self) -> str:
  117. info = [f'TestWorkerProcess #{self.worker_id}']
  118. if self.is_alive():
  119. info.append("running")
  120. else:
  121. info.append('stopped')
  122. test = self.current_test_name
  123. if test:
  124. info.append(f'test={test}')
  125. popen = self._popen
  126. if popen is not None:
  127. dt = time.monotonic() - self.start_time
  128. info.extend((f'pid={self._popen.pid}',
  129. f'time={format_duration(dt)}'))
  130. return '<%s>' % ' '.join(info)
  131. def _kill(self) -> None:
  132. popen = self._popen
  133. if popen is None:
  134. return
  135. if self._killed:
  136. return
  137. self._killed = True
  138. if USE_PROCESS_GROUP:
  139. what = f"{self} process group"
  140. else:
  141. what = f"{self}"
  142. print(f"Kill {what}", file=sys.stderr, flush=True)
  143. try:
  144. if USE_PROCESS_GROUP:
  145. os.killpg(popen.pid, signal.SIGKILL)
  146. else:
  147. popen.kill()
  148. except ProcessLookupError:
  149. # popen.kill(): the process completed, the TestWorkerProcess thread
  150. # read its exit status, but Popen.send_signal() read the returncode
  151. # just before Popen.wait() set returncode.
  152. pass
  153. except OSError as exc:
  154. print_warning(f"Failed to kill {what}: {exc!r}")
  155. def stop(self) -> None:
  156. # Method called from a different thread to stop this thread
  157. self._stopped = True
  158. self._kill()
  159. def mp_result_error(
  160. self,
  161. test_result: TestResult,
  162. stdout: str = '',
  163. err_msg=None
  164. ) -> MultiprocessResult:
  165. test_result.duration_sec = time.monotonic() - self.start_time
  166. return MultiprocessResult(test_result, stdout, err_msg)
  167. def _run_process(self, test_name: str, stdout_fh: TextIO) -> int:
  168. self.start_time = time.monotonic()
  169. self.current_test_name = test_name
  170. try:
  171. popen = run_test_in_subprocess(test_name, self.ns, stdout_fh)
  172. self._killed = False
  173. self._popen = popen
  174. except:
  175. self.current_test_name = None
  176. raise
  177. try:
  178. if self._stopped:
  179. # If kill() has been called before self._popen is set,
  180. # self._popen is still running. Call again kill()
  181. # to ensure that the process is killed.
  182. self._kill()
  183. raise ExitThread
  184. try:
  185. # gh-94026: stdout+stderr are written to tempfile
  186. retcode = popen.wait(timeout=self.timeout)
  187. assert retcode is not None
  188. return retcode
  189. except subprocess.TimeoutExpired:
  190. if self._stopped:
  191. # kill() has been called: communicate() fails on reading
  192. # closed stdout
  193. raise ExitThread
  194. # On timeout, kill the process
  195. self._kill()
  196. # None means TIMEOUT for the caller
  197. retcode = None
  198. # bpo-38207: Don't attempt to call communicate() again: on it
  199. # can hang until all child processes using stdout
  200. # pipes completes.
  201. except OSError:
  202. if self._stopped:
  203. # kill() has been called: communicate() fails
  204. # on reading closed stdout
  205. raise ExitThread
  206. raise
  207. except:
  208. self._kill()
  209. raise
  210. finally:
  211. self._wait_completed()
  212. self._popen = None
  213. self.current_test_name = None
  214. def _runtest(self, test_name: str) -> MultiprocessResult:
  215. if sys.platform == 'win32':
  216. # gh-95027: When stdout is not a TTY, Python uses the ANSI code
  217. # page for the sys.stdout encoding. If the main process runs in a
  218. # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding.
  219. encoding = locale.getencoding()
  220. else:
  221. encoding = sys.stdout.encoding
  222. # gh-94026: Write stdout+stderr to a tempfile as workaround for
  223. # non-blocking pipes on Emscripten with NodeJS.
  224. with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_fh:
  225. # gh-93353: Check for leaked temporary files in the parent process,
  226. # since the deletion of temporary files can happen late during
  227. # Python finalization: too late for libregrtest.
  228. retcode = self._run_process(test_name, stdout_fh)
  229. stdout_fh.seek(0)
  230. stdout = stdout_fh.read().strip()
  231. if retcode is None:
  232. return self.mp_result_error(Timeout(test_name), stdout)
  233. err_msg = None
  234. if retcode != 0:
  235. err_msg = "Exit code %s" % retcode
  236. else:
  237. stdout, _, result = stdout.rpartition("\n")
  238. stdout = stdout.rstrip()
  239. if not result:
  240. err_msg = "Failed to parse worker stdout"
  241. else:
  242. try:
  243. # deserialize run_tests_worker() output
  244. result = json.loads(result, object_hook=decode_test_result)
  245. except Exception as exc:
  246. err_msg = "Failed to parse worker JSON: %s" % exc
  247. if err_msg is not None:
  248. return self.mp_result_error(ChildError(test_name), stdout, err_msg)
  249. return MultiprocessResult(result, stdout, err_msg)
  250. def run(self) -> None:
  251. while not self._stopped:
  252. try:
  253. try:
  254. test_name = next(self.pending)
  255. except StopIteration:
  256. break
  257. mp_result = self._runtest(test_name)
  258. self.output.put((False, mp_result))
  259. if must_stop(mp_result.result, self.ns):
  260. break
  261. except ExitThread:
  262. break
  263. except BaseException:
  264. self.output.put((True, traceback.format_exc()))
  265. break
  266. def _wait_completed(self) -> None:
  267. popen = self._popen
  268. try:
  269. popen.wait(JOIN_TIMEOUT)
  270. except (subprocess.TimeoutExpired, OSError) as exc:
  271. print_warning(f"Failed to wait for {self} completion "
  272. f"(timeout={format_duration(JOIN_TIMEOUT)}): "
  273. f"{exc!r}")
  274. def wait_stopped(self, start_time: float) -> None:
  275. # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
  276. # which killed the process. Sometimes, killing the process from the
  277. # main thread does not interrupt popen.communicate() in
  278. # TestWorkerProcess thread. This loop with a timeout is a workaround
  279. # for that.
  280. #
  281. # Moreover, if this method fails to join the thread, it is likely
  282. # that Python will hang at exit while calling threading._shutdown()
  283. # which tries again to join the blocked thread. Regrtest.main()
  284. # uses EXIT_TIMEOUT to workaround this second bug.
  285. while True:
  286. # Write a message every second
  287. self.join(1.0)
  288. if not self.is_alive():
  289. break
  290. dt = time.monotonic() - start_time
  291. self.regrtest.log(f"Waiting for {self} thread "
  292. f"for {format_duration(dt)}")
  293. if dt > JOIN_TIMEOUT:
  294. print_warning(f"Failed to join {self} in {format_duration(dt)}")
  295. break
  296. def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
  297. running = []
  298. for worker in workers:
  299. current_test_name = worker.current_test_name
  300. if not current_test_name:
  301. continue
  302. dt = time.monotonic() - worker.start_time
  303. if dt >= PROGRESS_MIN_TIME:
  304. text = '%s (%s)' % (current_test_name, format_duration(dt))
  305. running.append(text)
  306. return running
  307. class MultiprocessTestRunner:
  308. def __init__(self, regrtest: Regrtest) -> None:
  309. self.regrtest = regrtest
  310. self.log = self.regrtest.log
  311. self.ns = regrtest.ns
  312. self.output: queue.Queue[QueueOutput] = queue.Queue()
  313. self.pending = MultiprocessIterator(self.regrtest.tests)
  314. if self.ns.timeout is not None:
  315. # Rely on faulthandler to kill a worker process. This timouet is
  316. # when faulthandler fails to kill a worker process. Give a maximum
  317. # of 5 minutes to faulthandler to kill the worker.
  318. self.worker_timeout = min(self.ns.timeout * 1.5,
  319. self.ns.timeout + 5 * 60)
  320. else:
  321. self.worker_timeout = None
  322. self.workers = None
  323. def start_workers(self) -> None:
  324. self.workers = [TestWorkerProcess(index, self)
  325. for index in range(1, self.ns.use_mp + 1)]
  326. msg = f"Run tests in parallel using {len(self.workers)} child processes"
  327. if self.ns.timeout:
  328. msg += (" (timeout: %s, worker timeout: %s)"
  329. % (format_duration(self.ns.timeout),
  330. format_duration(self.worker_timeout)))
  331. self.log(msg)
  332. for worker in self.workers:
  333. worker.start()
  334. def stop_workers(self) -> None:
  335. start_time = time.monotonic()
  336. for worker in self.workers:
  337. worker.stop()
  338. for worker in self.workers:
  339. worker.wait_stopped(start_time)
  340. def _get_result(self) -> QueueOutput | None:
  341. use_faulthandler = (self.ns.timeout is not None)
  342. timeout = PROGRESS_UPDATE
  343. # bpo-46205: check the status of workers every iteration to avoid
  344. # waiting forever on an empty queue.
  345. while any(worker.is_alive() for worker in self.workers):
  346. if use_faulthandler:
  347. faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
  348. exit=True)
  349. # wait for a thread
  350. try:
  351. return self.output.get(timeout=timeout)
  352. except queue.Empty:
  353. pass
  354. # display progress
  355. running = get_running(self.workers)
  356. if running and not self.ns.pgo:
  357. self.log('running: %s' % ', '.join(running))
  358. # all worker threads are done: consume pending results
  359. try:
  360. return self.output.get(timeout=0)
  361. except queue.Empty:
  362. return None
  363. def display_result(self, mp_result: MultiprocessResult) -> None:
  364. result = mp_result.result
  365. text = str(result)
  366. if mp_result.error_msg is not None:
  367. # CHILD_ERROR
  368. text += ' (%s)' % mp_result.error_msg
  369. elif (result.duration_sec >= PROGRESS_MIN_TIME and not self.ns.pgo):
  370. text += ' (%s)' % format_duration(result.duration_sec)
  371. running = get_running(self.workers)
  372. if running and not self.ns.pgo:
  373. text += ' -- running: %s' % ', '.join(running)
  374. self.regrtest.display_progress(self.test_index, text)
  375. def _process_result(self, item: QueueOutput) -> bool:
  376. """Returns True if test runner must stop."""
  377. if item[0]:
  378. # Thread got an exception
  379. format_exc = item[1]
  380. print_warning(f"regrtest worker thread failed: {format_exc}")
  381. return True
  382. self.test_index += 1
  383. mp_result = item[1]
  384. self.regrtest.accumulate_result(mp_result.result)
  385. self.display_result(mp_result)
  386. if mp_result.stdout:
  387. print(mp_result.stdout, flush=True)
  388. if must_stop(mp_result.result, self.ns):
  389. return True
  390. return False
  391. def run_tests(self) -> None:
  392. self.start_workers()
  393. self.test_index = 0
  394. try:
  395. while True:
  396. item = self._get_result()
  397. if item is None:
  398. break
  399. stop = self._process_result(item)
  400. if stop:
  401. break
  402. except KeyboardInterrupt:
  403. print()
  404. self.regrtest.interrupted = True
  405. finally:
  406. if self.ns.timeout is not None:
  407. faulthandler.cancel_dump_traceback_later()
  408. # Always ensure that all worker processes are no longer
  409. # worker when we exit this function
  410. self.pending.stop()
  411. self.stop_workers()
  412. def run_tests_multiprocess(regrtest: Regrtest) -> None:
  413. MultiprocessTestRunner(regrtest).run_tests()
  414. class EncodeTestResult(json.JSONEncoder):
  415. """Encode a TestResult (sub)class object into a JSON dict."""
  416. def default(self, o: Any) -> dict[str, Any]:
  417. if isinstance(o, TestResult):
  418. result = vars(o)
  419. result["__test_result__"] = o.__class__.__name__
  420. return result
  421. return super().default(o)
  422. def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]:
  423. """Decode a TestResult (sub)class object from a JSON dict."""
  424. if "__test_result__" not in d:
  425. return d
  426. cls_name = d.pop("__test_result__")
  427. for cls in get_all_test_result_classes():
  428. if cls.__name__ == cls_name:
  429. return cls(**d)
  430. def get_all_test_result_classes() -> set[type[TestResult]]:
  431. prev_count = 0
  432. classes = {TestResult}
  433. while len(classes) > prev_count:
  434. prev_count = len(classes)
  435. to_add = []
  436. for cls in classes:
  437. to_add.extend(cls.__subclasses__())
  438. classes.update(to_add)
  439. return classes