| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- """This script contains the actual auditing tests.
- It should not be imported directly, but should be run by the test_audit
- module with arguments identifying each test.
- """
- import contextlib
- import os
- import sys
- class TestHook:
- """Used in standard hook tests to collect any logged events.
- Should be used in a with block to ensure that it has no impact
- after the test completes.
- """
- def __init__(self, raise_on_events=None, exc_type=RuntimeError):
- self.raise_on_events = raise_on_events or ()
- self.exc_type = exc_type
- self.seen = []
- self.closed = False
- def __enter__(self, *a):
- sys.addaudithook(self)
- return self
- def __exit__(self, *a):
- self.close()
- def close(self):
- self.closed = True
- @property
- def seen_events(self):
- return [i[0] for i in self.seen]
- def __call__(self, event, args):
- if self.closed:
- return
- self.seen.append((event, args))
- if event in self.raise_on_events:
- raise self.exc_type("saw event " + event)
- # Simple helpers, since we are not in unittest here
- def assertEqual(x, y):
- if x != y:
- raise AssertionError(f"{x!r} should equal {y!r}")
- def assertIn(el, series):
- if el not in series:
- raise AssertionError(f"{el!r} should be in {series!r}")
- def assertNotIn(el, series):
- if el in series:
- raise AssertionError(f"{el!r} should not be in {series!r}")
- def assertSequenceEqual(x, y):
- if len(x) != len(y):
- raise AssertionError(f"{x!r} should equal {y!r}")
- if any(ix != iy for ix, iy in zip(x, y)):
- raise AssertionError(f"{x!r} should equal {y!r}")
- @contextlib.contextmanager
- def assertRaises(ex_type):
- try:
- yield
- assert False, f"expected {ex_type}"
- except BaseException as ex:
- if isinstance(ex, AssertionError):
- raise
- assert type(ex) is ex_type, f"{ex} should be {ex_type}"
- def test_basic():
- with TestHook() as hook:
- sys.audit("test_event", 1, 2, 3)
- assertEqual(hook.seen[0][0], "test_event")
- assertEqual(hook.seen[0][1], (1, 2, 3))
- def test_block_add_hook():
- # Raising an exception should prevent a new hook from being added,
- # but will not propagate out.
- with TestHook(raise_on_events="sys.addaudithook") as hook1:
- with TestHook() as hook2:
- sys.audit("test_event")
- assertIn("test_event", hook1.seen_events)
- assertNotIn("test_event", hook2.seen_events)
- def test_block_add_hook_baseexception():
- # Raising BaseException will propagate out when adding a hook
- with assertRaises(BaseException):
- with TestHook(
- raise_on_events="sys.addaudithook", exc_type=BaseException
- ) as hook1:
- # Adding this next hook should raise BaseException
- with TestHook() as hook2:
- pass
- def test_marshal():
- import marshal
- o = ("a", "b", "c", 1, 2, 3)
- payload = marshal.dumps(o)
- with TestHook() as hook:
- assertEqual(o, marshal.loads(marshal.dumps(o)))
- try:
- with open("test-marshal.bin", "wb") as f:
- marshal.dump(o, f)
- with open("test-marshal.bin", "rb") as f:
- assertEqual(o, marshal.load(f))
- finally:
- os.unlink("test-marshal.bin")
- actual = [(a[0], a[1]) for e, a in hook.seen if e == "marshal.dumps"]
- assertSequenceEqual(actual, [(o, marshal.version)] * 2)
- actual = [a[0] for e, a in hook.seen if e == "marshal.loads"]
- assertSequenceEqual(actual, [payload])
- actual = [e for e, a in hook.seen if e == "marshal.load"]
- assertSequenceEqual(actual, ["marshal.load"])
- def test_pickle():
- import pickle
- class PicklePrint:
- def __reduce_ex__(self, p):
- return str, ("Pwned!",)
- payload_1 = pickle.dumps(PicklePrint())
- payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3))
- # Before we add the hook, ensure our malicious pickle loads
- assertEqual("Pwned!", pickle.loads(payload_1))
- with TestHook(raise_on_events="pickle.find_class") as hook:
- with assertRaises(RuntimeError):
- # With the hook enabled, loading globals is not allowed
- pickle.loads(payload_1)
- # pickles with no globals are okay
- pickle.loads(payload_2)
- def test_monkeypatch():
- class A:
- pass
- class B:
- pass
- class C(A):
- pass
- a = A()
- with TestHook() as hook:
- # Catch name changes
- C.__name__ = "X"
- # Catch type changes
- C.__bases__ = (B,)
- # Ensure bypassing __setattr__ is still caught
- type.__dict__["__bases__"].__set__(C, (B,))
- # Catch attribute replacement
- C.__init__ = B.__init__
- # Catch attribute addition
- C.new_attr = 123
- # Catch class changes
- a.__class__ = B
- actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"]
- assertSequenceEqual(
- [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], actual
- )
- def test_open():
- # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
- try:
- import ssl
- load_dh_params = ssl.create_default_context().load_dh_params
- except ImportError:
- load_dh_params = None
- # Try a range of "open" functions.
- # All of them should fail
- with TestHook(raise_on_events={"open"}) as hook:
- for fn, *args in [
- (open, sys.argv[2], "r"),
- (open, sys.executable, "rb"),
- (open, 3, "wb"),
- (open, sys.argv[2], "w", -1, None, None, None, False, lambda *a: 1),
- (load_dh_params, sys.argv[2]),
- ]:
- if not fn:
- continue
- with assertRaises(RuntimeError):
- fn(*args)
- actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]]
- actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]]
- assertSequenceEqual(
- [
- i
- for i in [
- (sys.argv[2], "r"),
- (sys.executable, "r"),
- (3, "w"),
- (sys.argv[2], "w"),
- (sys.argv[2], "rb") if load_dh_params else None,
- ]
- if i is not None
- ],
- actual_mode,
- )
- assertSequenceEqual([], actual_flag)
- def test_cantrace():
- traced = []
- def trace(frame, event, *args):
- if frame.f_code == TestHook.__call__.__code__:
- traced.append(event)
- old = sys.settrace(trace)
- try:
- with TestHook() as hook:
- # No traced call
- eval("1")
- # No traced call
- hook.__cantrace__ = False
- eval("2")
- # One traced call
- hook.__cantrace__ = True
- eval("3")
- # Two traced calls (writing to private member, eval)
- hook.__cantrace__ = 1
- eval("4")
- # One traced call (writing to private member)
- hook.__cantrace__ = 0
- finally:
- sys.settrace(old)
- assertSequenceEqual(["call"] * 4, traced)
- def test_mmap():
- import mmap
- with TestHook() as hook:
- mmap.mmap(-1, 8)
- assertEqual(hook.seen[0][1][:2], (-1, 8))
- def test_excepthook():
- def excepthook(exc_type, exc_value, exc_tb):
- if exc_type is not RuntimeError:
- sys.__excepthook__(exc_type, exc_value, exc_tb)
- def hook(event, args):
- if event == "sys.excepthook":
- if not isinstance(args[2], args[1]):
- raise TypeError(f"Expected isinstance({args[2]!r}, " f"{args[1]!r})")
- if args[0] != excepthook:
- raise ValueError(f"Expected {args[0]} == {excepthook}")
- print(event, repr(args[2]))
- sys.addaudithook(hook)
- sys.excepthook = excepthook
- raise RuntimeError("fatal-error")
- def test_unraisablehook():
- from _testcapi import write_unraisable_exc
- def unraisablehook(hookargs):
- pass
- def hook(event, args):
- if event == "sys.unraisablehook":
- if args[0] != unraisablehook:
- raise ValueError(f"Expected {args[0]} == {unraisablehook}")
- print(event, repr(args[1].exc_value), args[1].err_msg)
- sys.addaudithook(hook)
- sys.unraisablehook = unraisablehook
- write_unraisable_exc(RuntimeError("nonfatal-error"), "for audit hook test", None)
- def test_winreg():
- from winreg import OpenKey, EnumKey, CloseKey, HKEY_LOCAL_MACHINE
- def hook(event, args):
- if not event.startswith("winreg."):
- return
- print(event, *args)
- sys.addaudithook(hook)
- k = OpenKey(HKEY_LOCAL_MACHINE, "Software")
- EnumKey(k, 0)
- try:
- EnumKey(k, 10000)
- except OSError:
- pass
- else:
- raise RuntimeError("Expected EnumKey(HKLM, 10000) to fail")
- kv = k.Detach()
- CloseKey(kv)
- def test_socket():
- import socket
- def hook(event, args):
- if event.startswith("socket."):
- print(event, *args)
- sys.addaudithook(hook)
- socket.gethostname()
- # Don't care if this fails, we just want the audit message
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- # Don't care if this fails, we just want the audit message
- sock.bind(('127.0.0.1', 8080))
- except Exception:
- pass
- finally:
- sock.close()
- def test_gc():
- import gc
- def hook(event, args):
- if event.startswith("gc."):
- print(event, *args)
- sys.addaudithook(hook)
- gc.get_objects(generation=1)
- x = object()
- y = [x]
- gc.get_referrers(x)
- gc.get_referents(y)
- def test_http_client():
- import http.client
- def hook(event, args):
- if event.startswith("http.client."):
- print(event, *args[1:])
- sys.addaudithook(hook)
- conn = http.client.HTTPConnection('www.python.org')
- try:
- conn.request('GET', '/')
- except OSError:
- print('http.client.send', '[cannot send]')
- finally:
- conn.close()
- def test_sqlite3():
- import sqlite3
- def hook(event, *args):
- if event.startswith("sqlite3."):
- print(event, *args)
- sys.addaudithook(hook)
- cx1 = sqlite3.connect(":memory:")
- cx2 = sqlite3.Connection(":memory:")
- # Configured without --enable-loadable-sqlite-extensions
- if hasattr(sqlite3.Connection, "enable_load_extension"):
- cx1.enable_load_extension(False)
- try:
- cx1.load_extension("test")
- except sqlite3.OperationalError:
- pass
- else:
- raise RuntimeError("Expected sqlite3.load_extension to fail")
- def test_sys_getframe():
- import sys
- def hook(event, args):
- if event.startswith("sys."):
- print(event, args[0].f_code.co_name)
- sys.addaudithook(hook)
- sys._getframe()
- def test_syslog():
- import syslog
- def hook(event, args):
- if event.startswith("syslog."):
- print(event, *args)
- sys.addaudithook(hook)
- syslog.openlog('python')
- syslog.syslog('test')
- syslog.setlogmask(syslog.LOG_DEBUG)
- syslog.closelog()
- # implicit open
- syslog.syslog('test2')
- # open with default ident
- syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0)
- sys.argv = None
- syslog.openlog()
- syslog.closelog()
- def test_not_in_gc():
- import gc
- hook = lambda *a: None
- sys.addaudithook(hook)
- for o in gc.get_objects():
- if isinstance(o, list):
- assert hook not in o
- if __name__ == "__main__":
- from test.support import suppress_msvcrt_asserts
- suppress_msvcrt_asserts()
- test = sys.argv[1]
- globals()[test]()
|