test_dtrace.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import dis
  2. import os.path
  3. import re
  4. import subprocess
  5. import sys
  6. import types
  7. import unittest
  8. from test import support
  9. from test.support import findfile
  10. if not support.has_subprocess_support:
  11. raise unittest.SkipTest("test module requires subprocess")
  12. def abspath(filename):
  13. return os.path.abspath(findfile(filename, subdir="dtracedata"))
  14. def normalize_trace_output(output):
  15. """Normalize DTrace output for comparison.
  16. DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
  17. are concatenated. So if the operating system moves our thread around, the
  18. straight result can be "non-causal". So we add timestamps to the probe
  19. firing, sort by that field, then strip it from the output"""
  20. # When compiling with '--with-pydebug', strip '[# refs]' debug output.
  21. output = re.sub(r"\[[0-9]+ refs\]", "", output)
  22. try:
  23. result = [
  24. row.split("\t")
  25. for row in output.splitlines()
  26. if row and not row.startswith('#')
  27. ]
  28. result.sort(key=lambda row: int(row[0]))
  29. result = [row[1] for row in result]
  30. return "\n".join(result)
  31. except (IndexError, ValueError):
  32. raise AssertionError(
  33. "tracer produced unparsable output:\n{}".format(output)
  34. )
  35. class TraceBackend:
  36. EXTENSION = None
  37. COMMAND = None
  38. COMMAND_ARGS = []
  39. def run_case(self, name, optimize_python=None):
  40. actual_output = normalize_trace_output(self.trace_python(
  41. script_file=abspath(name + self.EXTENSION),
  42. python_file=abspath(name + ".py"),
  43. optimize_python=optimize_python))
  44. with open(abspath(name + self.EXTENSION + ".expected")) as f:
  45. expected_output = f.read().rstrip()
  46. return (expected_output, actual_output)
  47. def generate_trace_command(self, script_file, subcommand=None):
  48. command = self.COMMAND + [script_file]
  49. if subcommand:
  50. command += ["-c", subcommand]
  51. return command
  52. def trace(self, script_file, subcommand=None):
  53. command = self.generate_trace_command(script_file, subcommand)
  54. stdout, _ = subprocess.Popen(command,
  55. stdout=subprocess.PIPE,
  56. stderr=subprocess.STDOUT,
  57. universal_newlines=True).communicate()
  58. return stdout
  59. def trace_python(self, script_file, python_file, optimize_python=None):
  60. python_flags = []
  61. if optimize_python:
  62. python_flags.extend(["-O"] * optimize_python)
  63. subcommand = " ".join([sys.executable] + python_flags + [python_file])
  64. return self.trace(script_file, subcommand)
  65. def assert_usable(self):
  66. try:
  67. output = self.trace(abspath("assert_usable" + self.EXTENSION))
  68. output = output.strip()
  69. except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
  70. output = str(fnfe)
  71. if output != "probe: success":
  72. raise unittest.SkipTest(
  73. "{}(1) failed: {}".format(self.COMMAND[0], output)
  74. )
  75. class DTraceBackend(TraceBackend):
  76. EXTENSION = ".d"
  77. COMMAND = ["dtrace", "-q", "-s"]
  78. class SystemTapBackend(TraceBackend):
  79. EXTENSION = ".stp"
  80. COMMAND = ["stap", "-g"]
  81. class TraceTests:
  82. # unittest.TestCase options
  83. maxDiff = None
  84. # TraceTests options
  85. backend = None
  86. optimize_python = 0
  87. @classmethod
  88. def setUpClass(self):
  89. self.backend.assert_usable()
  90. def run_case(self, name):
  91. actual_output, expected_output = self.backend.run_case(
  92. name, optimize_python=self.optimize_python)
  93. self.assertEqual(actual_output, expected_output)
  94. def test_function_entry_return(self):
  95. self.run_case("call_stack")
  96. def test_verify_call_opcodes(self):
  97. """Ensure our call stack test hits all function call opcodes"""
  98. opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
  99. with open(abspath("call_stack.py")) as f:
  100. code_string = f.read()
  101. def get_function_instructions(funcname):
  102. # Recompile with appropriate optimization setting
  103. code = compile(source=code_string,
  104. filename="<string>",
  105. mode="exec",
  106. optimize=self.optimize_python)
  107. for c in code.co_consts:
  108. if isinstance(c, types.CodeType) and c.co_name == funcname:
  109. return dis.get_instructions(c)
  110. return []
  111. for instruction in get_function_instructions('start'):
  112. opcodes.discard(instruction.opname)
  113. self.assertEqual(set(), opcodes)
  114. def test_gc(self):
  115. self.run_case("gc")
  116. def test_line(self):
  117. self.run_case("line")
  118. class DTraceNormalTests(TraceTests, unittest.TestCase):
  119. backend = DTraceBackend()
  120. optimize_python = 0
  121. class DTraceOptimizedTests(TraceTests, unittest.TestCase):
  122. backend = DTraceBackend()
  123. optimize_python = 2
  124. class SystemTapNormalTests(TraceTests, unittest.TestCase):
  125. backend = SystemTapBackend()
  126. optimize_python = 0
  127. class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
  128. backend = SystemTapBackend()
  129. optimize_python = 2
  130. if __name__ == '__main__':
  131. unittest.main()