| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- """Tests for sys.audit and sys.addaudithook
- """
- import subprocess
- import sys
- import unittest
- from test import support
- from test.support import import_helper
- from test.support import os_helper
- if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
- raise unittest.SkipTest("test only relevant when sys.audit is available")
- AUDIT_TESTS_PY = support.findfile("audit-tests.py")
- class AuditTest(unittest.TestCase):
- maxDiff = None
- @support.requires_subprocess()
- def do_test(self, *args):
- with subprocess.Popen(
- [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
- encoding="utf-8",
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- ) as p:
- p.wait()
- sys.stdout.writelines(p.stdout)
- sys.stderr.writelines(p.stderr)
- if p.returncode:
- self.fail("".join(p.stderr))
- @support.requires_subprocess()
- def run_python(self, *args):
- events = []
- with subprocess.Popen(
- [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
- encoding="utf-8",
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- ) as p:
- p.wait()
- sys.stderr.writelines(p.stderr)
- return (
- p.returncode,
- [line.strip().partition(" ") for line in p.stdout],
- "".join(p.stderr),
- )
- def test_basic(self):
- self.do_test("test_basic")
- def test_block_add_hook(self):
- self.do_test("test_block_add_hook")
- def test_block_add_hook_baseexception(self):
- self.do_test("test_block_add_hook_baseexception")
- def test_marshal(self):
- import_helper.import_module("marshal")
- self.do_test("test_marshal")
- def test_pickle(self):
- import_helper.import_module("pickle")
- self.do_test("test_pickle")
- def test_monkeypatch(self):
- self.do_test("test_monkeypatch")
- def test_open(self):
- self.do_test("test_open", os_helper.TESTFN)
- def test_cantrace(self):
- self.do_test("test_cantrace")
- def test_mmap(self):
- self.do_test("test_mmap")
- def test_excepthook(self):
- returncode, events, stderr = self.run_python("test_excepthook")
- if not returncode:
- self.fail(f"Expected fatal exception\n{stderr}")
- self.assertSequenceEqual(
- [("sys.excepthook", " ", "RuntimeError('fatal-error')")], events
- )
- def test_unraisablehook(self):
- returncode, events, stderr = self.run_python("test_unraisablehook")
- if returncode:
- self.fail(stderr)
- self.assertEqual(events[0][0], "sys.unraisablehook")
- self.assertEqual(
- events[0][2],
- "RuntimeError('nonfatal-error') Exception ignored for audit hook test",
- )
- def test_winreg(self):
- import_helper.import_module("winreg")
- returncode, events, stderr = self.run_python("test_winreg")
- if returncode:
- self.fail(stderr)
- self.assertEqual(events[0][0], "winreg.OpenKey")
- self.assertEqual(events[1][0], "winreg.OpenKey/result")
- expected = events[1][2]
- self.assertTrue(expected)
- self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 0"], events[2])
- self.assertSequenceEqual(["winreg.EnumKey", " ", f"{expected} 10000"], events[3])
- self.assertSequenceEqual(["winreg.PyHKEY.Detach", " ", expected], events[4])
- def test_socket(self):
- import_helper.import_module("socket")
- returncode, events, stderr = self.run_python("test_socket")
- if returncode:
- self.fail(stderr)
- if support.verbose:
- print(*events, sep='\n')
- self.assertEqual(events[0][0], "socket.gethostname")
- self.assertEqual(events[1][0], "socket.__new__")
- self.assertEqual(events[2][0], "socket.bind")
- self.assertTrue(events[2][2].endswith("('127.0.0.1', 8080)"))
- def test_gc(self):
- returncode, events, stderr = self.run_python("test_gc")
- if returncode:
- self.fail(stderr)
- if support.verbose:
- print(*events, sep='\n')
- self.assertEqual(
- [event[0] for event in events],
- ["gc.get_objects", "gc.get_referrers", "gc.get_referents"]
- )
- def test_http(self):
- import_helper.import_module("http.client")
- returncode, events, stderr = self.run_python("test_http_client")
- if returncode:
- self.fail(stderr)
- if support.verbose:
- print(*events, sep='\n')
- self.assertEqual(events[0][0], "http.client.connect")
- self.assertEqual(events[0][2], "www.python.org 80")
- self.assertEqual(events[1][0], "http.client.send")
- if events[1][2] != '[cannot send]':
- self.assertIn('HTTP', events[1][2])
- def test_sqlite3(self):
- sqlite3 = import_helper.import_module("sqlite3")
- returncode, events, stderr = self.run_python("test_sqlite3")
- if returncode:
- self.fail(stderr)
- if support.verbose:
- print(*events, sep='\n')
- actual = [ev[0] for ev in events]
- expected = ["sqlite3.connect", "sqlite3.connect/handle"] * 2
- if hasattr(sqlite3.Connection, "enable_load_extension"):
- expected += [
- "sqlite3.enable_load_extension",
- "sqlite3.load_extension",
- ]
- self.assertEqual(actual, expected)
- def test_sys_getframe(self):
- returncode, events, stderr = self.run_python("test_sys_getframe")
- if returncode:
- self.fail(stderr)
- if support.verbose:
- print(*events, sep='\n')
- actual = [(ev[0], ev[2]) for ev in events]
- expected = [("sys._getframe", "test_sys_getframe")]
- self.assertEqual(actual, expected)
- def test_syslog(self):
- syslog = import_helper.import_module("syslog")
- returncode, events, stderr = self.run_python("test_syslog")
- if returncode:
- self.fail(stderr)
- if support.verbose:
- print('Events:', *events, sep='\n ')
- self.assertSequenceEqual(
- events,
- [('syslog.openlog', ' ', f'python 0 {syslog.LOG_USER}'),
- ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test'),
- ('syslog.setlogmask', ' ', f'{syslog.LOG_DEBUG}'),
- ('syslog.closelog', '', ''),
- ('syslog.syslog', ' ', f'{syslog.LOG_INFO} test2'),
- ('syslog.openlog', ' ', f'audit-tests.py 0 {syslog.LOG_USER}'),
- ('syslog.openlog', ' ', f'audit-tests.py {syslog.LOG_NDELAY} {syslog.LOG_LOCAL0}'),
- ('syslog.openlog', ' ', f'None 0 {syslog.LOG_USER}'),
- ('syslog.closelog', '', '')]
- )
- def test_not_in_gc(self):
- returncode, _, stderr = self.run_python("test_not_in_gc")
- if returncode:
- self.fail(stderr)
- if __name__ == "__main__":
- unittest.main()
|