| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483 |
- import unittest
- import unittest.mock
- from test.support import (verbose, refcount_test,
- cpython_only, requires_subprocess)
- from test.support.import_helper import import_module
- from test.support.os_helper import temp_dir, TESTFN, unlink
- from test.support.script_helper import assert_python_ok, make_script
- from test.support import threading_helper
- import gc
- import sys
- import sysconfig
- import textwrap
- import threading
- import time
- import weakref
- try:
- from _testcapi import with_tp_del
- except ImportError:
- def with_tp_del(cls):
- class C(object):
- def __new__(cls, *args, **kwargs):
- raise TypeError('requires _testcapi.with_tp_del')
- return C
- try:
- from _testcapi import ContainerNoGC
- except ImportError:
- ContainerNoGC = None
- ### Support code
- ###############################################################################
- # Bug 1055820 has several tests of longstanding bugs involving weakrefs and
- # cyclic gc.
- # An instance of C1055820 has a self-loop, so becomes cyclic trash when
- # unreachable.
- class C1055820(object):
- def __init__(self, i):
- self.i = i
- self.loop = self
- class GC_Detector(object):
- # Create an instance I. Then gc hasn't happened again so long as
- # I.gc_happened is false.
- def __init__(self):
- self.gc_happened = False
- def it_happened(ignored):
- self.gc_happened = True
- # Create a piece of cyclic trash that triggers it_happened when
- # gc collects it.
- self.wr = weakref.ref(C1055820(666), it_happened)
- @with_tp_del
- class Uncollectable(object):
- """Create a reference cycle with multiple __del__ methods.
- An object in a reference cycle will never have zero references,
- and so must be garbage collected. If one or more objects in the
- cycle have __del__ methods, the gc refuses to guess an order,
- and leaves the cycle uncollected."""
- def __init__(self, partner=None):
- if partner is None:
- self.partner = Uncollectable(partner=self)
- else:
- self.partner = partner
- def __tp_del__(self):
- pass
- if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
- BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
- else:
- # Usually, sys.gettotalrefcount() is only present if Python has been
- # compiled in debug mode. If it's missing, expect that Python has
- # been released in release mode: with NDEBUG defined.
- BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))
- ### Tests
- ###############################################################################
- class GCTests(unittest.TestCase):
- def test_list(self):
- l = []
- l.append(l)
- gc.collect()
- del l
- self.assertEqual(gc.collect(), 1)
- def test_dict(self):
- d = {}
- d[1] = d
- gc.collect()
- del d
- self.assertEqual(gc.collect(), 1)
- def test_tuple(self):
- # since tuples are immutable we close the loop with a list
- l = []
- t = (l,)
- l.append(t)
- gc.collect()
- del t
- del l
- self.assertEqual(gc.collect(), 2)
- def test_class(self):
- class A:
- pass
- A.a = A
- gc.collect()
- del A
- self.assertNotEqual(gc.collect(), 0)
- def test_newstyleclass(self):
- class A(object):
- pass
- gc.collect()
- del A
- self.assertNotEqual(gc.collect(), 0)
- def test_instance(self):
- class A:
- pass
- a = A()
- a.a = a
- gc.collect()
- del a
- self.assertNotEqual(gc.collect(), 0)
- def test_newinstance(self):
- class A(object):
- pass
- a = A()
- a.a = a
- gc.collect()
- del a
- self.assertNotEqual(gc.collect(), 0)
- class B(list):
- pass
- class C(B, A):
- pass
- a = C()
- a.a = a
- gc.collect()
- del a
- self.assertNotEqual(gc.collect(), 0)
- del B, C
- self.assertNotEqual(gc.collect(), 0)
- A.a = A()
- del A
- self.assertNotEqual(gc.collect(), 0)
- self.assertEqual(gc.collect(), 0)
- def test_method(self):
- # Tricky: self.__init__ is a bound method, it references the instance.
- class A:
- def __init__(self):
- self.init = self.__init__
- a = A()
- gc.collect()
- del a
- self.assertNotEqual(gc.collect(), 0)
- @cpython_only
- def test_legacy_finalizer(self):
- # A() is uncollectable if it is part of a cycle, make sure it shows up
- # in gc.garbage.
- @with_tp_del
- class A:
- def __tp_del__(self): pass
- class B:
- pass
- a = A()
- a.a = a
- id_a = id(a)
- b = B()
- b.b = b
- gc.collect()
- del a
- del b
- self.assertNotEqual(gc.collect(), 0)
- for obj in gc.garbage:
- if id(obj) == id_a:
- del obj.a
- break
- else:
- self.fail("didn't find obj in garbage (finalizer)")
- gc.garbage.remove(obj)
- @cpython_only
- def test_legacy_finalizer_newclass(self):
- # A() is uncollectable if it is part of a cycle, make sure it shows up
- # in gc.garbage.
- @with_tp_del
- class A(object):
- def __tp_del__(self): pass
- class B(object):
- pass
- a = A()
- a.a = a
- id_a = id(a)
- b = B()
- b.b = b
- gc.collect()
- del a
- del b
- self.assertNotEqual(gc.collect(), 0)
- for obj in gc.garbage:
- if id(obj) == id_a:
- del obj.a
- break
- else:
- self.fail("didn't find obj in garbage (finalizer)")
- gc.garbage.remove(obj)
- def test_function(self):
- # Tricky: f -> d -> f, code should call d.clear() after the exec to
- # break the cycle.
- d = {}
- exec("def f(): pass\n", d)
- gc.collect()
- del d
- self.assertEqual(gc.collect(), 2)
- def test_function_tp_clear_leaves_consistent_state(self):
- # https://github.com/python/cpython/issues/91636
- code = """if 1:
- import gc
- import weakref
- class LateFin:
- __slots__ = ('ref',)
- def __del__(self):
- # 8. Now `latefin`'s finalizer is called. Here we
- # obtain a reference to `func`, which is currently
- # undergoing `tp_clear`.
- global func
- func = self.ref()
- class Cyclic(tuple):
- __slots__ = ()
- # 4. The finalizers of all garbage objects are called. In
- # this case this is only us as `func` doesn't have a
- # finalizer.
- def __del__(self):
- # 5. Create a weakref to `func` now. If we had created
- # it earlier, it would have been cleared by the
- # garbage collector before calling the finalizers.
- self[1].ref = weakref.ref(self[0])
- # 6. Drop the global reference to `latefin`. The only
- # remaining reference is the one we have.
- global latefin
- del latefin
- # 7. Now `func` is `tp_clear`-ed. This drops the last
- # reference to `Cyclic`, which gets `tp_dealloc`-ed.
- # This drops the last reference to `latefin`.
- latefin = LateFin()
- def func():
- pass
- cyc = tuple.__new__(Cyclic, (func, latefin))
- # 1. Create a reference cycle of `cyc` and `func`.
- func.__module__ = cyc
- # 2. Make the cycle unreachable, but keep the global reference
- # to `latefin` so that it isn't detected as garbage. This
- # way its finalizer will not be called immediately.
- del func, cyc
- # 3. Invoke garbage collection,
- # which will find `cyc` and `func` as garbage.
- gc.collect()
- # 9. Previously, this would crash because `func_qualname`
- # had been NULL-ed out by func_clear().
- print(f"{func=}")
- """
- # We're mostly just checking that this doesn't crash.
- rc, stdout, stderr = assert_python_ok("-c", code)
- self.assertEqual(rc, 0)
- self.assertRegex(stdout, rb"""\A\s*func=<function at \S+>\s*\Z""")
- self.assertFalse(stderr)
- @refcount_test
- def test_frame(self):
- def f():
- frame = sys._getframe()
- gc.collect()
- f()
- self.assertEqual(gc.collect(), 1)
- def test_saveall(self):
- # Verify that cyclic garbage like lists show up in gc.garbage if the
- # SAVEALL option is enabled.
- # First make sure we don't save away other stuff that just happens to
- # be waiting for collection.
- gc.collect()
- # if this fails, someone else created immortal trash
- self.assertEqual(gc.garbage, [])
- L = []
- L.append(L)
- id_L = id(L)
- debug = gc.get_debug()
- gc.set_debug(debug | gc.DEBUG_SAVEALL)
- del L
- gc.collect()
- gc.set_debug(debug)
- self.assertEqual(len(gc.garbage), 1)
- obj = gc.garbage.pop()
- self.assertEqual(id(obj), id_L)
- def test_del(self):
- # __del__ methods can trigger collection, make this to happen
- thresholds = gc.get_threshold()
- gc.enable()
- gc.set_threshold(1)
- class A:
- def __del__(self):
- dir(self)
- a = A()
- del a
- gc.disable()
- gc.set_threshold(*thresholds)
- def test_del_newclass(self):
- # __del__ methods can trigger collection, make this to happen
- thresholds = gc.get_threshold()
- gc.enable()
- gc.set_threshold(1)
- class A(object):
- def __del__(self):
- dir(self)
- a = A()
- del a
- gc.disable()
- gc.set_threshold(*thresholds)
- # The following two tests are fragile:
- # They precisely count the number of allocations,
- # which is highly implementation-dependent.
- # For example, disposed tuples are not freed, but reused.
- # To minimize variations, though, we first store the get_count() results
- # and check them at the end.
- @refcount_test
- def test_get_count(self):
- gc.collect()
- a, b, c = gc.get_count()
- x = []
- d, e, f = gc.get_count()
- self.assertEqual((b, c), (0, 0))
- self.assertEqual((e, f), (0, 0))
- # This is less fragile than asserting that a equals 0.
- self.assertLess(a, 5)
- # Between the two calls to get_count(), at least one object was
- # created (the list).
- self.assertGreater(d, a)
- @refcount_test
- def test_collect_generations(self):
- gc.collect()
- # This object will "trickle" into generation N + 1 after
- # each call to collect(N)
- x = []
- gc.collect(0)
- # x is now in gen 1
- a, b, c = gc.get_count()
- gc.collect(1)
- # x is now in gen 2
- d, e, f = gc.get_count()
- gc.collect(2)
- # x is now in gen 3
- g, h, i = gc.get_count()
- # We don't check a, d, g since their exact values depends on
- # internal implementation details of the interpreter.
- self.assertEqual((b, c), (1, 0))
- self.assertEqual((e, f), (0, 1))
- self.assertEqual((h, i), (0, 0))
- def test_trashcan(self):
- class Ouch:
- n = 0
- def __del__(self):
- Ouch.n = Ouch.n + 1
- if Ouch.n % 17 == 0:
- gc.collect()
- # "trashcan" is a hack to prevent stack overflow when deallocating
- # very deeply nested tuples etc. It works in part by abusing the
- # type pointer and refcount fields, and that can yield horrible
- # problems when gc tries to traverse the structures.
- # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
- # most likely die via segfault.
- # Note: In 2.3 the possibility for compiling without cyclic gc was
- # removed, and that in turn allows the trashcan mechanism to work
- # via much simpler means (e.g., it never abuses the type pointer or
- # refcount fields anymore). Since it's much less likely to cause a
- # problem now, the various constants in this expensive (we force a lot
- # of full collections) test are cut back from the 2.2 version.
- gc.enable()
- N = 150
- for count in range(2):
- t = []
- for i in range(N):
- t = [t, Ouch()]
- u = []
- for i in range(N):
- u = [u, Ouch()]
- v = {}
- for i in range(N):
- v = {1: v, 2: Ouch()}
- gc.disable()
- @threading_helper.requires_working_threading()
- def test_trashcan_threads(self):
- # Issue #13992: trashcan mechanism should be thread-safe
- NESTING = 60
- N_THREADS = 2
- def sleeper_gen():
- """A generator that releases the GIL when closed or dealloc'ed."""
- try:
- yield
- finally:
- time.sleep(0.000001)
- class C(list):
- # Appending to a list is atomic, which avoids the use of a lock.
- inits = []
- dels = []
- def __init__(self, alist):
- self[:] = alist
- C.inits.append(None)
- def __del__(self):
- # This __del__ is called by subtype_dealloc().
- C.dels.append(None)
- # `g` will release the GIL when garbage-collected. This
- # helps assert subtype_dealloc's behaviour when threads
- # switch in the middle of it.
- g = sleeper_gen()
- next(g)
- # Now that __del__ is finished, subtype_dealloc will proceed
- # to call list_dealloc, which also uses the trashcan mechanism.
- def make_nested():
- """Create a sufficiently nested container object so that the
- trashcan mechanism is invoked when deallocating it."""
- x = C([])
- for i in range(NESTING):
- x = [C([x])]
- del x
- def run_thread():
- """Exercise make_nested() in a loop."""
- while not exit:
- make_nested()
- old_switchinterval = sys.getswitchinterval()
- sys.setswitchinterval(1e-5)
- try:
- exit = []
- threads = []
- for i in range(N_THREADS):
- t = threading.Thread(target=run_thread)
- threads.append(t)
- with threading_helper.start_threads(threads, lambda: exit.append(1)):
- time.sleep(1.0)
- finally:
- sys.setswitchinterval(old_switchinterval)
- gc.collect()
- self.assertEqual(len(C.inits), len(C.dels))
- def test_boom(self):
- class Boom:
- def __getattr__(self, someattribute):
- del self.attr
- raise AttributeError
- a = Boom()
- b = Boom()
- a.attr = b
- b.attr = a
- gc.collect()
- garbagelen = len(gc.garbage)
- del a, b
- # a<->b are in a trash cycle now. Collection will invoke
- # Boom.__getattr__ (to see whether a and b have __del__ methods), and
- # __getattr__ deletes the internal "attr" attributes as a side effect.
- # That causes the trash cycle to get reclaimed via refcounts falling to
- # 0, thus mutating the trash graph as a side effect of merely asking
- # whether __del__ exists. This used to (before 2.3b1) crash Python.
- # Now __getattr__ isn't called.
- self.assertEqual(gc.collect(), 2)
- self.assertEqual(len(gc.garbage), garbagelen)
- def test_boom2(self):
- class Boom2:
- def __init__(self):
- self.x = 0
- def __getattr__(self, someattribute):
- self.x += 1
- if self.x > 1:
- del self.attr
- raise AttributeError
- a = Boom2()
- b = Boom2()
- a.attr = b
- b.attr = a
- gc.collect()
- garbagelen = len(gc.garbage)
- del a, b
- # Much like test_boom(), except that __getattr__ doesn't break the
- # cycle until the second time gc checks for __del__. As of 2.3b1,
- # there isn't a second time, so this simply cleans up the trash cycle.
- # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
- # reclaimed this way.
- self.assertEqual(gc.collect(), 2)
- self.assertEqual(len(gc.garbage), garbagelen)
- def test_boom_new(self):
- # boom__new and boom2_new are exactly like boom and boom2, except use
- # new-style classes.
- class Boom_New(object):
- def __getattr__(self, someattribute):
- del self.attr
- raise AttributeError
- a = Boom_New()
- b = Boom_New()
- a.attr = b
- b.attr = a
- gc.collect()
- garbagelen = len(gc.garbage)
- del a, b
- self.assertEqual(gc.collect(), 2)
- self.assertEqual(len(gc.garbage), garbagelen)
- def test_boom2_new(self):
- class Boom2_New(object):
- def __init__(self):
- self.x = 0
- def __getattr__(self, someattribute):
- self.x += 1
- if self.x > 1:
- del self.attr
- raise AttributeError
- a = Boom2_New()
- b = Boom2_New()
- a.attr = b
- b.attr = a
- gc.collect()
- garbagelen = len(gc.garbage)
- del a, b
- self.assertEqual(gc.collect(), 2)
- self.assertEqual(len(gc.garbage), garbagelen)
- def test_get_referents(self):
- alist = [1, 3, 5]
- got = gc.get_referents(alist)
- got.sort()
- self.assertEqual(got, alist)
- atuple = tuple(alist)
- got = gc.get_referents(atuple)
- got.sort()
- self.assertEqual(got, alist)
- adict = {1: 3, 5: 7}
- expected = [1, 3, 5, 7]
- got = gc.get_referents(adict)
- got.sort()
- self.assertEqual(got, expected)
- got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
- got.sort()
- self.assertEqual(got, [0, 0] + list(range(5)))
- self.assertEqual(gc.get_referents(1, 'a', 4j), [])
- def test_is_tracked(self):
- # Atomic built-in types are not tracked, user-defined objects and
- # mutable containers are.
- # NOTE: types with special optimizations (e.g. tuple) have tests
- # in their own test files instead.
- self.assertFalse(gc.is_tracked(None))
- self.assertFalse(gc.is_tracked(1))
- self.assertFalse(gc.is_tracked(1.0))
- self.assertFalse(gc.is_tracked(1.0 + 5.0j))
- self.assertFalse(gc.is_tracked(True))
- self.assertFalse(gc.is_tracked(False))
- self.assertFalse(gc.is_tracked(b"a"))
- self.assertFalse(gc.is_tracked("a"))
- self.assertFalse(gc.is_tracked(bytearray(b"a")))
- self.assertFalse(gc.is_tracked(type))
- self.assertFalse(gc.is_tracked(int))
- self.assertFalse(gc.is_tracked(object))
- self.assertFalse(gc.is_tracked(object()))
- class UserClass:
- pass
- class UserInt(int):
- pass
- # Base class is object; no extra fields.
- class UserClassSlots:
- __slots__ = ()
- # Base class is fixed size larger than object; no extra fields.
- class UserFloatSlots(float):
- __slots__ = ()
- # Base class is variable size; no extra fields.
- class UserIntSlots(int):
- __slots__ = ()
- self.assertTrue(gc.is_tracked(gc))
- self.assertTrue(gc.is_tracked(UserClass))
- self.assertTrue(gc.is_tracked(UserClass()))
- self.assertTrue(gc.is_tracked(UserInt()))
- self.assertTrue(gc.is_tracked([]))
- self.assertTrue(gc.is_tracked(set()))
- self.assertTrue(gc.is_tracked(UserClassSlots()))
- self.assertTrue(gc.is_tracked(UserFloatSlots()))
- self.assertTrue(gc.is_tracked(UserIntSlots()))
- def test_is_finalized(self):
- # Objects not tracked by the always gc return false
- self.assertFalse(gc.is_finalized(3))
- storage = []
- class Lazarus:
- def __del__(self):
- storage.append(self)
- lazarus = Lazarus()
- self.assertFalse(gc.is_finalized(lazarus))
- del lazarus
- gc.collect()
- lazarus = storage.pop()
- self.assertTrue(gc.is_finalized(lazarus))
- def test_bug1055820b(self):
- # Corresponds to temp2b.py in the bug report.
- ouch = []
- def callback(ignored):
- ouch[:] = [wr() for wr in WRs]
- Cs = [C1055820(i) for i in range(2)]
- WRs = [weakref.ref(c, callback) for c in Cs]
- c = None
- gc.collect()
- self.assertEqual(len(ouch), 0)
- # Make the two instances trash, and collect again. The bug was that
- # the callback materialized a strong reference to an instance, but gc
- # cleared the instance's dict anyway.
- Cs = None
- gc.collect()
- self.assertEqual(len(ouch), 2) # else the callbacks didn't run
- for x in ouch:
- # If the callback resurrected one of these guys, the instance
- # would be damaged, with an empty __dict__.
- self.assertEqual(x, None)
- def test_bug21435(self):
- # This is a poor test - its only virtue is that it happened to
- # segfault on Tim's Windows box before the patch for 21435 was
- # applied. That's a nasty bug relying on specific pieces of cyclic
- # trash appearing in exactly the right order in finalize_garbage()'s
- # input list.
- # But there's no reliable way to force that order from Python code,
- # so over time chances are good this test won't really be testing much
- # of anything anymore. Still, if it blows up, there's _some_
- # problem ;-)
- gc.collect()
- class A:
- pass
- class B:
- def __init__(self, x):
- self.x = x
- def __del__(self):
- self.attr = None
- def do_work():
- a = A()
- b = B(A())
- a.attr = b
- b.attr = a
- do_work()
- gc.collect() # this blows up (bad C pointer) when it fails
- @cpython_only
- @requires_subprocess()
- def test_garbage_at_shutdown(self):
- import subprocess
- code = """if 1:
- import gc
- import _testcapi
- @_testcapi.with_tp_del
- class X:
- def __init__(self, name):
- self.name = name
- def __repr__(self):
- return "<X %%r>" %% self.name
- def __tp_del__(self):
- pass
- x = X('first')
- x.x = x
- x.y = X('second')
- del x
- gc.set_debug(%s)
- """
- def run_command(code):
- p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- p.stdout.close()
- p.stderr.close()
- self.assertEqual(p.returncode, 0)
- self.assertEqual(stdout, b"")
- return stderr
- stderr = run_command(code % "0")
- self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
- b"shutdown; use", stderr)
- self.assertNotIn(b"<X 'first'>", stderr)
- # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
- stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
- self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
- b"shutdown", stderr)
- self.assertTrue(
- (b"[<X 'first'>, <X 'second'>]" in stderr) or
- (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
- # With DEBUG_SAVEALL, no additional message should get printed
- # (because gc.garbage also contains normally reclaimable cyclic
- # references, and its elements get printed at runtime anyway).
- stderr = run_command(code % "gc.DEBUG_SAVEALL")
- self.assertNotIn(b"uncollectable objects at shutdown", stderr)
- def test_gc_main_module_at_shutdown(self):
- # Create a reference cycle through the __main__ module and check
- # it gets collected at interpreter shutdown.
- code = """if 1:
- class C:
- def __del__(self):
- print('__del__ called')
- l = [C()]
- l.append(l)
- """
- rc, out, err = assert_python_ok('-c', code)
- self.assertEqual(out.strip(), b'__del__ called')
- def test_gc_ordinary_module_at_shutdown(self):
- # Same as above, but with a non-__main__ module.
- with temp_dir() as script_dir:
- module = """if 1:
- class C:
- def __del__(self):
- print('__del__ called')
- l = [C()]
- l.append(l)
- """
- code = """if 1:
- import sys
- sys.path.insert(0, %r)
- import gctest
- """ % (script_dir,)
- make_script(script_dir, 'gctest', module)
- rc, out, err = assert_python_ok('-c', code)
- self.assertEqual(out.strip(), b'__del__ called')
- def test_global_del_SystemExit(self):
- code = """if 1:
- class ClassWithDel:
- def __del__(self):
- print('__del__ called')
- a = ClassWithDel()
- a.link = a
- raise SystemExit(0)"""
- self.addCleanup(unlink, TESTFN)
- with open(TESTFN, 'w', encoding="utf-8") as script:
- script.write(code)
- rc, out, err = assert_python_ok(TESTFN)
- self.assertEqual(out.strip(), b'__del__ called')
- def test_get_stats(self):
- stats = gc.get_stats()
- self.assertEqual(len(stats), 3)
- for st in stats:
- self.assertIsInstance(st, dict)
- self.assertEqual(set(st),
- {"collected", "collections", "uncollectable"})
- self.assertGreaterEqual(st["collected"], 0)
- self.assertGreaterEqual(st["collections"], 0)
- self.assertGreaterEqual(st["uncollectable"], 0)
- # Check that collection counts are incremented correctly
- if gc.isenabled():
- self.addCleanup(gc.enable)
- gc.disable()
- old = gc.get_stats()
- gc.collect(0)
- new = gc.get_stats()
- self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
- self.assertEqual(new[1]["collections"], old[1]["collections"])
- self.assertEqual(new[2]["collections"], old[2]["collections"])
- gc.collect(2)
- new = gc.get_stats()
- self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
- self.assertEqual(new[1]["collections"], old[1]["collections"])
- self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
- def test_freeze(self):
- gc.freeze()
- self.assertGreater(gc.get_freeze_count(), 0)
- gc.unfreeze()
- self.assertEqual(gc.get_freeze_count(), 0)
- def test_get_objects(self):
- gc.collect()
- l = []
- l.append(l)
- self.assertTrue(
- any(l is element for element in gc.get_objects(generation=0))
- )
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=1))
- )
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=2))
- )
- gc.collect(generation=0)
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=0))
- )
- self.assertTrue(
- any(l is element for element in gc.get_objects(generation=1))
- )
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=2))
- )
- gc.collect(generation=1)
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=0))
- )
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=1))
- )
- self.assertTrue(
- any(l is element for element in gc.get_objects(generation=2))
- )
- gc.collect(generation=2)
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=0))
- )
- self.assertFalse(
- any(l is element for element in gc.get_objects(generation=1))
- )
- self.assertTrue(
- any(l is element for element in gc.get_objects(generation=2))
- )
- del l
- gc.collect()
- def test_get_objects_arguments(self):
- gc.collect()
- self.assertEqual(len(gc.get_objects()),
- len(gc.get_objects(generation=None)))
- self.assertRaises(ValueError, gc.get_objects, 1000)
- self.assertRaises(ValueError, gc.get_objects, -1000)
- self.assertRaises(TypeError, gc.get_objects, "1")
- self.assertRaises(TypeError, gc.get_objects, 1.234)
- def test_resurrection_only_happens_once_per_object(self):
- class A: # simple self-loop
- def __init__(self):
- self.me = self
- class Lazarus(A):
- resurrected = 0
- resurrected_instances = []
- def __del__(self):
- Lazarus.resurrected += 1
- Lazarus.resurrected_instances.append(self)
- gc.collect()
- gc.disable()
- # We start with 0 resurrections
- laz = Lazarus()
- self.assertEqual(Lazarus.resurrected, 0)
- # Deleting the instance and triggering a collection
- # resurrects the object
- del laz
- gc.collect()
- self.assertEqual(Lazarus.resurrected, 1)
- self.assertEqual(len(Lazarus.resurrected_instances), 1)
- # Clearing the references and forcing a collection
- # should not resurrect the object again.
- Lazarus.resurrected_instances.clear()
- self.assertEqual(Lazarus.resurrected, 1)
- gc.collect()
- self.assertEqual(Lazarus.resurrected, 1)
- gc.enable()
- def test_resurrection_is_transitive(self):
- class Cargo:
- def __init__(self):
- self.me = self
- class Lazarus:
- resurrected_instances = []
- def __del__(self):
- Lazarus.resurrected_instances.append(self)
- gc.collect()
- gc.disable()
- laz = Lazarus()
- cargo = Cargo()
- cargo_id = id(cargo)
- # Create a cycle between cargo and laz
- laz.cargo = cargo
- cargo.laz = laz
- # Drop the references, force a collection and check that
- # everything was resurrected.
- del laz, cargo
- gc.collect()
- self.assertEqual(len(Lazarus.resurrected_instances), 1)
- instance = Lazarus.resurrected_instances.pop()
- self.assertTrue(hasattr(instance, "cargo"))
- self.assertEqual(id(instance.cargo), cargo_id)
- gc.collect()
- gc.enable()
- def test_resurrection_does_not_block_cleanup_of_other_objects(self):
- # When a finalizer resurrects objects, stats were reporting them as
- # having been collected. This affected both collect()'s return
- # value and the dicts returned by get_stats().
- N = 100
- class A: # simple self-loop
- def __init__(self):
- self.me = self
- class Z(A): # resurrecting __del__
- def __del__(self):
- zs.append(self)
- zs = []
- def getstats():
- d = gc.get_stats()[-1]
- return d['collected'], d['uncollectable']
- gc.collect()
- gc.disable()
- # No problems if just collecting A() instances.
- oldc, oldnc = getstats()
- for i in range(N):
- A()
- t = gc.collect()
- c, nc = getstats()
- self.assertEqual(t, N) # instance objects
- self.assertEqual(c - oldc, N)
- self.assertEqual(nc - oldnc, 0)
- # But Z() is not actually collected.
- oldc, oldnc = c, nc
- Z()
- # Nothing is collected - Z() is merely resurrected.
- t = gc.collect()
- c, nc = getstats()
- self.assertEqual(t, 0)
- self.assertEqual(c - oldc, 0)
- self.assertEqual(nc - oldnc, 0)
- # Z() should not prevent anything else from being collected.
- oldc, oldnc = c, nc
- for i in range(N):
- A()
- Z()
- t = gc.collect()
- c, nc = getstats()
- self.assertEqual(t, N)
- self.assertEqual(c - oldc, N)
- self.assertEqual(nc - oldnc, 0)
- # The A() trash should have been reclaimed already but the
- # 2 copies of Z are still in zs (and the associated dicts).
- oldc, oldnc = c, nc
- zs.clear()
- t = gc.collect()
- c, nc = getstats()
- self.assertEqual(t, 2)
- self.assertEqual(c - oldc, 2)
- self.assertEqual(nc - oldnc, 0)
- gc.enable()
- @unittest.skipIf(ContainerNoGC is None,
- 'requires ContainerNoGC extension type')
- def test_trash_weakref_clear(self):
- # Test that trash weakrefs are properly cleared (bpo-38006).
- #
- # Structure we are creating:
- #
- # Z <- Y <- A--+--> WZ -> C
- # ^ |
- # +--+
- # where:
- # WZ is a weakref to Z with callback C
- # Y doesn't implement tp_traverse
- # A contains a reference to itself, Y and WZ
- #
- # A, Y, Z, WZ are all trash. The GC doesn't know that Z is trash
- # because Y does not implement tp_traverse. To show the bug, WZ needs
- # to live long enough so that Z is deallocated before it. Then, if
- # gcmodule is buggy, when Z is being deallocated, C will run.
- #
- # To ensure WZ lives long enough, we put it in a second reference
- # cycle. That trick only works due to the ordering of the GC prev/next
- # linked lists. So, this test is a bit fragile.
- #
- # The bug reported in bpo-38006 is caused because the GC did not
- # clear WZ before starting the process of calling tp_clear on the
- # trash. Normally, handle_weakrefs() would find the weakref via Z and
- # clear it. However, since the GC cannot find Z, WR is not cleared and
- # it can execute during delete_garbage(). That can lead to disaster
- # since the callback might tinker with objects that have already had
- # tp_clear called on them (leaving them in possibly invalid states).
- callback = unittest.mock.Mock()
- class A:
- __slots__ = ['a', 'y', 'wz']
- class Z:
- pass
- # setup required object graph, as described above
- a = A()
- a.a = a
- a.y = ContainerNoGC(Z())
- a.wz = weakref.ref(a.y.value, callback)
- # create second cycle to keep WZ alive longer
- wr_cycle = [a.wz]
- wr_cycle.append(wr_cycle)
- # ensure trash unrelated to this test is gone
- gc.collect()
- gc.disable()
- # release references and create trash
- del a, wr_cycle
- gc.collect()
- # if called, it means there is a bug in the GC. The weakref should be
- # cleared before Z dies.
- callback.assert_not_called()
- gc.enable()
- class GCCallbackTests(unittest.TestCase):
- def setUp(self):
- # Save gc state and disable it.
- self.enabled = gc.isenabled()
- gc.disable()
- self.debug = gc.get_debug()
- gc.set_debug(0)
- gc.callbacks.append(self.cb1)
- gc.callbacks.append(self.cb2)
- self.othergarbage = []
- def tearDown(self):
- # Restore gc state
- del self.visit
- gc.callbacks.remove(self.cb1)
- gc.callbacks.remove(self.cb2)
- gc.set_debug(self.debug)
- if self.enabled:
- gc.enable()
- # destroy any uncollectables
- gc.collect()
- for obj in gc.garbage:
- if isinstance(obj, Uncollectable):
- obj.partner = None
- del gc.garbage[:]
- del self.othergarbage
- gc.collect()
- def preclean(self):
- # Remove all fluff from the system. Invoke this function
- # manually rather than through self.setUp() for maximum
- # safety.
- self.visit = []
- gc.collect()
- garbage, gc.garbage[:] = gc.garbage[:], []
- self.othergarbage.append(garbage)
- self.visit = []
- def cb1(self, phase, info):
- self.visit.append((1, phase, dict(info)))
- def cb2(self, phase, info):
- self.visit.append((2, phase, dict(info)))
- if phase == "stop" and hasattr(self, "cleanup"):
- # Clean Uncollectable from garbage
- uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
- gc.garbage[:] = [e for e in gc.garbage
- if not isinstance(e, Uncollectable)]
- for e in uc:
- e.partner = None
- def test_collect(self):
- self.preclean()
- gc.collect()
- # Algorithmically verify the contents of self.visit
- # because it is long and tortuous.
- # Count the number of visits to each callback
- n = [v[0] for v in self.visit]
- n1 = [i for i in n if i == 1]
- n2 = [i for i in n if i == 2]
- self.assertEqual(n1, [1]*2)
- self.assertEqual(n2, [2]*2)
- # Count that we got the right number of start and stop callbacks.
- n = [v[1] for v in self.visit]
- n1 = [i for i in n if i == "start"]
- n2 = [i for i in n if i == "stop"]
- self.assertEqual(n1, ["start"]*2)
- self.assertEqual(n2, ["stop"]*2)
- # Check that we got the right info dict for all callbacks
- for v in self.visit:
- info = v[2]
- self.assertTrue("generation" in info)
- self.assertTrue("collected" in info)
- self.assertTrue("uncollectable" in info)
- def test_collect_generation(self):
- self.preclean()
- gc.collect(2)
- for v in self.visit:
- info = v[2]
- self.assertEqual(info["generation"], 2)
- @cpython_only
- def test_collect_garbage(self):
- self.preclean()
- # Each of these cause two objects to be garbage:
- Uncollectable()
- Uncollectable()
- C1055820(666)
- gc.collect()
- for v in self.visit:
- if v[1] != "stop":
- continue
- info = v[2]
- self.assertEqual(info["collected"], 1)
- self.assertEqual(info["uncollectable"], 4)
- # We should now have the Uncollectables in gc.garbage
- self.assertEqual(len(gc.garbage), 4)
- for e in gc.garbage:
- self.assertIsInstance(e, Uncollectable)
- # Now, let our callback handle the Uncollectable instances
- self.cleanup=True
- self.visit = []
- gc.garbage[:] = []
- gc.collect()
- for v in self.visit:
- if v[1] != "stop":
- continue
- info = v[2]
- self.assertEqual(info["collected"], 0)
- self.assertEqual(info["uncollectable"], 2)
- # Uncollectables should be gone
- self.assertEqual(len(gc.garbage), 0)
- @unittest.skipIf(BUILD_WITH_NDEBUG,
- 'built with -NDEBUG')
- def test_refcount_errors(self):
- self.preclean()
- # Verify the "handling" of objects with broken refcounts
- # Skip the test if ctypes is not available
- import_module("ctypes")
- import subprocess
- code = textwrap.dedent('''
- from test.support import gc_collect, SuppressCrashReport
- a = [1, 2, 3]
- b = [a]
- # Avoid coredump when Py_FatalError() calls abort()
- SuppressCrashReport().__enter__()
- # Simulate the refcount of "a" being too low (compared to the
- # references held on it by live data), but keeping it above zero
- # (to avoid deallocating it):
- import ctypes
- ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
- # The garbage collector should now have a fatal error
- # when it reaches the broken object
- gc_collect()
- ''')
- p = subprocess.Popen([sys.executable, "-c", code],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- p.stdout.close()
- p.stderr.close()
- # Verify that stderr has a useful error message:
- self.assertRegex(stderr,
- br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.')
- self.assertRegex(stderr,
- br'refcount is too small')
- # "address : 0x7fb5062efc18"
- # "address : 7FB5062EFC18"
- address_regex = br'[0-9a-fA-Fx]+'
- self.assertRegex(stderr,
- br'object address : ' + address_regex)
- self.assertRegex(stderr,
- br'object refcount : 1')
- self.assertRegex(stderr,
- br'object type : ' + address_regex)
- self.assertRegex(stderr,
- br'object type name: list')
- self.assertRegex(stderr,
- br'object repr : \[1, 2, 3\]')
- class GCTogglingTests(unittest.TestCase):
- def setUp(self):
- gc.enable()
- def tearDown(self):
- gc.disable()
- def test_bug1055820c(self):
- # Corresponds to temp2c.py in the bug report. This is pretty
- # elaborate.
- c0 = C1055820(0)
- # Move c0 into generation 2.
- gc.collect()
- c1 = C1055820(1)
- c1.keep_c0_alive = c0
- del c0.loop # now only c1 keeps c0 alive
- c2 = C1055820(2)
- c2wr = weakref.ref(c2) # no callback!
- ouch = []
- def callback(ignored):
- ouch[:] = [c2wr()]
- # The callback gets associated with a wr on an object in generation 2.
- c0wr = weakref.ref(c0, callback)
- c0 = c1 = c2 = None
- # What we've set up: c0, c1, and c2 are all trash now. c0 is in
- # generation 2. The only thing keeping it alive is that c1 points to
- # it. c1 and c2 are in generation 0, and are in self-loops. There's a
- # global weakref to c2 (c2wr), but that weakref has no callback.
- # There's also a global weakref to c0 (c0wr), and that does have a
- # callback, and that callback references c2 via c2wr().
- #
- # c0 has a wr with callback, which references c2wr
- # ^
- # |
- # | Generation 2 above dots
- #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
- # | Generation 0 below dots
- # |
- # |
- # ^->c1 ^->c2 has a wr but no callback
- # | | | |
- # <--v <--v
- #
- # So this is the nightmare: when generation 0 gets collected, we see
- # that c2 has a callback-free weakref, and c1 doesn't even have a
- # weakref. Collecting generation 0 doesn't see c0 at all, and c0 is
- # the only object that has a weakref with a callback. gc clears c1
- # and c2. Clearing c1 has the side effect of dropping the refcount on
- # c0 to 0, so c0 goes away (despite that it's in an older generation)
- # and c0's wr callback triggers. That in turn materializes a reference
- # to c2 via c2wr(), but c2 gets cleared anyway by gc.
- # We want to let gc happen "naturally", to preserve the distinction
- # between generations.
- junk = []
- i = 0
- detector = GC_Detector()
- while not detector.gc_happened:
- i += 1
- if i > 10000:
- self.fail("gc didn't happen after 10000 iterations")
- self.assertEqual(len(ouch), 0)
- junk.append([]) # this will eventually trigger gc
- self.assertEqual(len(ouch), 1) # else the callback wasn't invoked
- for x in ouch:
- # If the callback resurrected c2, the instance would be damaged,
- # with an empty __dict__.
- self.assertEqual(x, None)
- def test_bug1055820d(self):
- # Corresponds to temp2d.py in the bug report. This is very much like
- # test_bug1055820c, but uses a __del__ method instead of a weakref
- # callback to sneak in a resurrection of cyclic trash.
- ouch = []
- class D(C1055820):
- def __del__(self):
- ouch[:] = [c2wr()]
- d0 = D(0)
- # Move all the above into generation 2.
- gc.collect()
- c1 = C1055820(1)
- c1.keep_d0_alive = d0
- del d0.loop # now only c1 keeps d0 alive
- c2 = C1055820(2)
- c2wr = weakref.ref(c2) # no callback!
- d0 = c1 = c2 = None
- # What we've set up: d0, c1, and c2 are all trash now. d0 is in
- # generation 2. The only thing keeping it alive is that c1 points to
- # it. c1 and c2 are in generation 0, and are in self-loops. There's
- # a global weakref to c2 (c2wr), but that weakref has no callback.
- # There are no other weakrefs.
- #
- # d0 has a __del__ method that references c2wr
- # ^
- # |
- # | Generation 2 above dots
- #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
- # | Generation 0 below dots
- # |
- # |
- # ^->c1 ^->c2 has a wr but no callback
- # | | | |
- # <--v <--v
- #
- # So this is the nightmare: when generation 0 gets collected, we see
- # that c2 has a callback-free weakref, and c1 doesn't even have a
- # weakref. Collecting generation 0 doesn't see d0 at all. gc clears
- # c1 and c2. Clearing c1 has the side effect of dropping the refcount
- # on d0 to 0, so d0 goes away (despite that it's in an older
- # generation) and d0's __del__ triggers. That in turn materializes
- # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
- # We want to let gc happen "naturally", to preserve the distinction
- # between generations.
- detector = GC_Detector()
- junk = []
- i = 0
- while not detector.gc_happened:
- i += 1
- if i > 10000:
- self.fail("gc didn't happen after 10000 iterations")
- self.assertEqual(len(ouch), 0)
- junk.append([]) # this will eventually trigger gc
- self.assertEqual(len(ouch), 1) # else __del__ wasn't invoked
- for x in ouch:
- # If __del__ resurrected c2, the instance would be damaged, with an
- # empty __dict__.
- self.assertEqual(x, None)
- class PythonFinalizationTests(unittest.TestCase):
- def test_ast_fini(self):
- # bpo-44184: Regression test for subtype_dealloc() when deallocating
- # an AST instance also destroy its AST type: subtype_dealloc() must
- # not access the type memory after deallocating the instance, since
- # the type memory can be freed as well. The test is also related to
- # _PyAST_Fini() which clears references to AST types.
- code = textwrap.dedent("""
- import ast
- import codecs
- # Small AST tree to keep their AST types alive
- tree = ast.parse("def f(x, y): return 2*x-y")
- x = [tree]
- x.append(x)
- # Put the cycle somewhere to survive until the last GC collection.
- # Codec search functions are only cleared at the end of
- # interpreter_clear().
- def search_func(encoding):
- return None
- search_func.a = x
- codecs.register(search_func)
- """)
- assert_python_ok("-c", code)
- def setUpModule():
- global enabled, debug
- enabled = gc.isenabled()
- gc.disable()
- assert not gc.isenabled()
- debug = gc.get_debug()
- gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
- gc.collect() # Delete 2nd generation garbage
- def tearDownModule():
- gc.set_debug(debug)
- # test gc.enable() even if GC is disabled by default
- if verbose:
- print("restoring automatic collection")
- # make sure to always test gc.enable()
- gc.enable()
- assert gc.isenabled()
- if not enabled:
- gc.disable()
- if __name__ == "__main__":
- unittest.main()
|