| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- import gc
- import re
- import sys
- import textwrap
- import threading
- import types
- import unittest
- import weakref
- from test import support
- from test.support import threading_helper
- from test.support.script_helper import assert_python_ok
- class ClearTest(unittest.TestCase):
- """
- Tests for frame.clear().
- """
- def inner(self, x=5, **kwargs):
- 1/0
- def outer(self, **kwargs):
- try:
- self.inner(**kwargs)
- except ZeroDivisionError as e:
- exc = e
- return exc
- def clear_traceback_frames(self, tb):
- """
- Clear all frames in a traceback.
- """
- while tb is not None:
- tb.tb_frame.clear()
- tb = tb.tb_next
- def test_clear_locals(self):
- class C:
- pass
- c = C()
- wr = weakref.ref(c)
- exc = self.outer(c=c)
- del c
- support.gc_collect()
- # A reference to c is held through the frames
- self.assertIsNot(None, wr())
- self.clear_traceback_frames(exc.__traceback__)
- support.gc_collect()
- # The reference was released by .clear()
- self.assertIs(None, wr())
- def test_clear_does_not_clear_specials(self):
- class C:
- pass
- c = C()
- exc = self.outer(c=c)
- del c
- f = exc.__traceback__.tb_frame
- f.clear()
- self.assertIsNot(f.f_code, None)
- self.assertIsNot(f.f_locals, None)
- self.assertIsNot(f.f_builtins, None)
- self.assertIsNot(f.f_globals, None)
- def test_clear_generator(self):
- endly = False
- def g():
- nonlocal endly
- try:
- yield
- self.inner()
- finally:
- endly = True
- gen = g()
- next(gen)
- self.assertFalse(endly)
- # Clearing the frame closes the generator
- gen.gi_frame.clear()
- self.assertTrue(endly)
- def test_clear_executing(self):
- # Attempting to clear an executing frame is forbidden.
- try:
- 1/0
- except ZeroDivisionError as e:
- f = e.__traceback__.tb_frame
- with self.assertRaises(RuntimeError):
- f.clear()
- with self.assertRaises(RuntimeError):
- f.f_back.clear()
- def test_clear_executing_generator(self):
- # Attempting to clear an executing generator frame is forbidden.
- endly = False
- def g():
- nonlocal endly
- try:
- 1/0
- except ZeroDivisionError as e:
- f = e.__traceback__.tb_frame
- with self.assertRaises(RuntimeError):
- f.clear()
- with self.assertRaises(RuntimeError):
- f.f_back.clear()
- yield f
- finally:
- endly = True
- gen = g()
- f = next(gen)
- self.assertFalse(endly)
- # Clearing the frame closes the generator
- f.clear()
- self.assertTrue(endly)
- def test_lineno_with_tracing(self):
- def record_line():
- f = sys._getframe(1)
- lines.append(f.f_lineno-f.f_code.co_firstlineno)
- def test(trace):
- record_line()
- if trace:
- sys._getframe(0).f_trace = True
- record_line()
- record_line()
- expected_lines = [1, 4, 5]
- lines = []
- test(False)
- self.assertEqual(lines, expected_lines)
- lines = []
- test(True)
- self.assertEqual(lines, expected_lines)
- @support.cpython_only
- def test_clear_refcycles(self):
- # .clear() doesn't leave any refcycle behind
- with support.disable_gc():
- class C:
- pass
- c = C()
- wr = weakref.ref(c)
- exc = self.outer(c=c)
- del c
- self.assertIsNot(None, wr())
- self.clear_traceback_frames(exc.__traceback__)
- self.assertIs(None, wr())
- class FrameAttrsTest(unittest.TestCase):
- def make_frames(self):
- def outer():
- x = 5
- y = 6
- def inner():
- z = x + 2
- 1/0
- t = 9
- return inner()
- try:
- outer()
- except ZeroDivisionError as e:
- tb = e.__traceback__
- frames = []
- while tb:
- frames.append(tb.tb_frame)
- tb = tb.tb_next
- return frames
- def test_locals(self):
- f, outer, inner = self.make_frames()
- outer_locals = outer.f_locals
- self.assertIsInstance(outer_locals.pop('inner'), types.FunctionType)
- self.assertEqual(outer_locals, {'x': 5, 'y': 6})
- inner_locals = inner.f_locals
- self.assertEqual(inner_locals, {'x': 5, 'z': 7})
- def test_clear_locals(self):
- # Test f_locals after clear() (issue #21897)
- f, outer, inner = self.make_frames()
- outer.clear()
- inner.clear()
- self.assertEqual(outer.f_locals, {})
- self.assertEqual(inner.f_locals, {})
- def test_locals_clear_locals(self):
- # Test f_locals before and after clear() (to exercise caching)
- f, outer, inner = self.make_frames()
- outer.f_locals
- inner.f_locals
- outer.clear()
- inner.clear()
- self.assertEqual(outer.f_locals, {})
- self.assertEqual(inner.f_locals, {})
- def test_f_lineno_del_segfault(self):
- f, _, _ = self.make_frames()
- with self.assertRaises(AttributeError):
- del f.f_lineno
- class ReprTest(unittest.TestCase):
- """
- Tests for repr(frame).
- """
- def test_repr(self):
- def outer():
- x = 5
- y = 6
- def inner():
- z = x + 2
- 1/0
- t = 9
- return inner()
- offset = outer.__code__.co_firstlineno
- try:
- outer()
- except ZeroDivisionError as e:
- tb = e.__traceback__
- frames = []
- while tb:
- frames.append(tb.tb_frame)
- tb = tb.tb_next
- else:
- self.fail("should have raised")
- f_this, f_outer, f_inner = frames
- file_repr = re.escape(repr(__file__))
- self.assertRegex(repr(f_this),
- r"^<frame at 0x[0-9a-fA-F]+, file %s, line %d, code test_repr>$"
- % (file_repr, offset + 23))
- self.assertRegex(repr(f_outer),
- r"^<frame at 0x[0-9a-fA-F]+, file %s, line %d, code outer>$"
- % (file_repr, offset + 7))
- self.assertRegex(repr(f_inner),
- r"^<frame at 0x[0-9a-fA-F]+, file %s, line %d, code inner>$"
- % (file_repr, offset + 5))
- class TestIncompleteFrameAreInvisible(unittest.TestCase):
- def test_issue95818(self):
- # See GH-95818 for details
- code = textwrap.dedent(f"""
- import gc
- gc.set_threshold(1,1,1)
- class GCHello:
- def __del__(self):
- print("Destroyed from gc")
- def gen():
- yield
- fd = open({__file__!r})
- l = [fd, GCHello()]
- l.append(l)
- del fd
- del l
- gen()
- """)
- assert_python_ok("-c", code)
- @support.cpython_only
- def test_sneaky_frame_object(self):
- def trace(frame, event, arg):
- """
- Don't actually do anything, just force a frame object to be created.
- """
- def callback(phase, info):
- """
- Yo dawg, I heard you like frames, so I'm allocating a frame while
- you're allocating a frame, so you can have a frame while you have a
- frame!
- """
- nonlocal sneaky_frame_object
- sneaky_frame_object = sys._getframe().f_back
- # We're done here:
- gc.callbacks.remove(callback)
- def f():
- while True:
- yield
- old_threshold = gc.get_threshold()
- old_callbacks = gc.callbacks[:]
- old_enabled = gc.isenabled()
- old_trace = sys.gettrace()
- try:
- # Stop the GC for a second while we set things up:
- gc.disable()
- # Create a paused generator:
- g = f()
- next(g)
- # Move all objects to the oldest generation, and tell the GC to run
- # on the *very next* allocation:
- gc.collect()
- gc.set_threshold(1, 0, 0)
- # Okay, so here's the nightmare scenario:
- # - We're tracing the resumption of a generator, which creates a new
- # frame object.
- # - The allocation of this frame object triggers a collection
- # *before* the frame object is actually created.
- # - During the collection, we request the exact same frame object.
- # This test does it with a GC callback, but in real code it would
- # likely be a trace function, weakref callback, or finalizer.
- # - The collection finishes, and the original frame object is
- # created. We now have two frame objects fighting over ownership
- # of the same interpreter frame!
- sys.settrace(trace)
- gc.callbacks.append(callback)
- sneaky_frame_object = None
- gc.enable()
- next(g)
- # g.gi_frame should be the the frame object from the callback (the
- # one that was *requested* second, but *created* first):
- self.assertIs(g.gi_frame, sneaky_frame_object)
- finally:
- gc.set_threshold(*old_threshold)
- gc.callbacks[:] = old_callbacks
- sys.settrace(old_trace)
- if old_enabled:
- gc.enable()
- @support.cpython_only
- @threading_helper.requires_working_threading()
- def test_sneaky_frame_object_teardown(self):
- class SneakyDel:
- def __del__(self):
- """
- Stash a reference to the entire stack for walking later.
- It may look crazy, but you'd be surprised how common this is
- when using a test runner (like pytest). The typical recipe is:
- ResourceWarning + -Werror + a custom sys.unraisablehook.
- """
- nonlocal sneaky_frame_object
- sneaky_frame_object = sys._getframe()
- class SneakyThread(threading.Thread):
- """
- A separate thread isn't needed to make this code crash, but it does
- make crashes more consistent, since it means sneaky_frame_object is
- backed by freed memory after the thread completes!
- """
- def run(self):
- """Run SneakyDel.__del__ as this frame is popped."""
- ref = SneakyDel()
- sneaky_frame_object = None
- t = SneakyThread()
- t.start()
- t.join()
- # sneaky_frame_object can be anything, really, but it's crucial that
- # SneakyThread.run's frame isn't anywhere on the stack while it's being
- # torn down:
- self.assertIsNotNone(sneaky_frame_object)
- while sneaky_frame_object is not None:
- self.assertIsNot(
- sneaky_frame_object.f_code, SneakyThread.run.__code__
- )
- sneaky_frame_object = sneaky_frame_object.f_back
- if __name__ == "__main__":
- unittest.main()
|