test_audit.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. """Tests for sys.audit and sys.addaudithook
  2. """
  3. import subprocess
  4. import sys
  5. import unittest
  6. from test import support
  7. from test.support import import_helper
  8. from test.support import os_helper
  9. if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
  10. raise unittest.SkipTest("test only relevant when sys.audit is available")
  11. AUDIT_TESTS_PY = support.findfile("audit-tests.py")
  12. class AuditTest(unittest.TestCase):
  13. maxDiff = None
  14. @support.requires_subprocess()
  15. def do_test(self, *args):
  16. with subprocess.Popen(
  17. [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
  18. encoding="utf-8",
  19. stdout=subprocess.PIPE,
  20. stderr=subprocess.PIPE,
  21. ) as p:
  22. p.wait()
  23. sys.stdout.writelines(p.stdout)
  24. sys.stderr.writelines(p.stderr)
  25. if p.returncode:
  26. self.fail("".join(p.stderr))
  27. @support.requires_subprocess()
  28. def run_python(self, *args):
  29. events = []
  30. with subprocess.Popen(
  31. [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
  32. encoding="utf-8",
  33. stdout=subprocess.PIPE,
  34. stderr=subprocess.PIPE,
  35. ) as p:
  36. p.wait()
  37. sys.stderr.writelines(p.stderr)
  38. return (
  39. p.returncode,
  40. [line.strip().partition(" ") for line in p.stdout],
  41. "".join(p.stderr),
  42. )
  43. def test_basic(self):
  44. self.do_test("test_basic")
  45. def test_block_add_hook(self):
  46. self.do_test("test_block_add_hook")
  47. def test_block_add_hook_baseexception(self):
  48. self.do_test("test_block_add_hook_baseexception")
  49. def test_marshal(self):
  50. import_helper.import_module("marshal")
  51. self.do_test("test_marshal")
  52. def test_pickle(self):
  53. import_helper.import_module("pickle")
  54. self.do_test("test_pickle")
  55. def test_monkeypatch(self):
  56. self.do_test("test_monkeypatch")
  57. def test_open(self):
  58. self.do_test("test_open", os_helper.TESTFN)
  59. def test_cantrace(self):
  60. self.do_test("test_cantrace")
  61. def test_mmap(self):
  62. self.do_test("test_mmap")
  63. def test_excepthook(self):
  64. returncode, events, stderr = self.run_python("test_excepthook")
  65. if not returncode:
  66. self.fail(f"Expected fatal exception\n{stderr}")
  67. self.assertSequenceEqual(
  68. [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
  69. )
  70. def test_unraisablehook(self):
  71. returncode, events, stderr = self.run_python("test_unraisablehook")
  72. if returncode:
  73. self.fail(stderr)
  74. self.assertEqual(events[0][0], "sys.unraisablehook")
  75. self.assertEqual(
  76. events[0][2],
  77. "RuntimeError('nonfatal-error') Exception ignored for audit hook test",
  78. )
  79. def test_winreg(self):
  80. import_helper.import_module("winreg")
  81. returncode, events, stderr = self.run_python("test_winreg")
  82. if returncode:
  83. self.fail(stderr)
  84. self.assertEqual(events[0][0], "winreg.OpenKey")
  85. self.assertEqual(events[1][0], "winreg.OpenKey/result")
  86. expected = events[1][2]
  87. self.assertTrue(expected)
  88. self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
  89. self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
  90. self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
  91. def test_socket(self):
  92. import_helper.import_module("socket")
  93. returncode, events, stderr = self.run_python("test_socket")
  94. if returncode:
  95. self.fail(stderr)
  96. if support.verbose:
  97. print(*events, sep='\n')
  98. self.assertEqual(events[0][0], "socket.gethostname")
  99. self.assertEqual(events[1][0], "socket.__new__")
  100. self.assertEqual(events[2][0], "socket.bind")
  101. self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
  102. def test_gc(self):
  103. returncode, events, stderr = self.run_python("test_gc")
  104. if returncode:
  105. self.fail(stderr)
  106. if support.verbose:
  107. print(*events, sep='\n')
  108. self.assertEqual(
  109. [event[0] for event in events],
  110. ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
  111. )
  112. def test_http(self):
  113. import_helper.import_module("http.client")
  114. returncode, events, stderr = self.run_python("test_http_client")
  115. if returncode:
  116. self.fail(stderr)
  117. if support.verbose:
  118. print(*events, sep='\n')
  119. self.assertEqual(events[0][0], "http.client.connect")
  120. self.assertEqual(events[0][2], "www.python.org 80")
  121. self.assertEqual(events[1][0], "http.client.send")
  122. if events[1][2] != '[cannot send]':
  123. self.assertIn('HTTP', events[1][2])
  124. def test_sqlite3(self):
  125. sqlite3 = import_helper.import_module("sqlite3")
  126. returncode, events, stderr = self.run_python("test_sqlite3")
  127. if returncode:
  128. self.fail(stderr)
  129. if support.verbose:
  130. print(*events, sep='\n')
  131. actual = [ev[0] for ev in events]
  132. expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
  133. if hasattr(sqlite3.Connection, "enable_load_extension"):
  134. expected += [
  135. "sqlite3.enable_load_extension",
  136. "sqlite3.load_extension",
  137. ]
  138. self.assertEqual(actual, expected)
  139. def test_sys_getframe(self):
  140. returncode, events, stderr = self.run_python("test_sys_getframe")
  141. if returncode:
  142. self.fail(stderr)
  143. if support.verbose:
  144. print(*events, sep='\n')
  145. actual = [(ev[0], ev[2]) for ev in events]
  146. expected = [("sys._getframe", "test_sys_getframe")]
  147. self.assertEqual(actual, expected)
  148. def test_syslog(self):
  149. syslog = import_helper.import_module("syslog")
  150. returncode, events, stderr = self.run_python("test_syslog")
  151. if returncode:
  152. self.fail(stderr)
  153. if support.verbose:
  154. print('Events:', *events, sep='\n ')
  155. self.assertSequenceEqual(
  156. events,
  157. [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
  158. ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
  159. ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
  160. ('syslog.closelog', '', ''),
  161. ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
  162. ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
  163. ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
  164. ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
  165. ('syslog.closelog', '', '')]
  166. )
  167. def test_not_in_gc(self):
  168. returncode, _, stderr = self.run_python("test_not_in_gc")
  169. if returncode:
  170. self.fail(stderr)
  171. if __name__ == "__main__":
  172. unittest.main()