| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538 |
- """
- Tests for object finalization semantics, as outlined in PEP 442.
- """
- import contextlib
- import gc
- import unittest
- import weakref
- try:
- from _testcapi import with_tp_del
- except ImportError:
- def with_tp_del(cls):
- class C(object):
- def __new__(cls, *args, **kwargs):
- raise TypeError('requires _testcapi.with_tp_del')
- return C
- try:
- from _testcapi import without_gc
- except ImportError:
- def without_gc(cls):
- class C:
- def __new__(cls, *args, **kwargs):
- raise TypeError('requires _testcapi.without_gc')
- return C
- from test import support
- class NonGCSimpleBase:
- """
- The base class for all the objects under test, equipped with various
- testing features.
- """
- survivors = []
- del_calls = []
- tp_del_calls = []
- errors = []
- _cleaning = False
- __slots__ = ()
- @classmethod
- def _cleanup(cls):
- cls.survivors.clear()
- cls.errors.clear()
- gc.garbage.clear()
- gc.collect()
- cls.del_calls.clear()
- cls.tp_del_calls.clear()
- @classmethod
- @contextlib.contextmanager
- def test(cls):
- """
- A context manager to use around all finalization tests.
- """
- with support.disable_gc():
- cls.del_calls.clear()
- cls.tp_del_calls.clear()
- NonGCSimpleBase._cleaning = False
- try:
- yield
- if cls.errors:
- raise cls.errors[0]
- finally:
- NonGCSimpleBase._cleaning = True
- cls._cleanup()
- def check_sanity(self):
- """
- Check the object is sane (non-broken).
- """
- def __del__(self):
- """
- PEP 442 finalizer. Record that this was called, check the
- object is in a sane state, and invoke a side effect.
- """
- try:
- if not self._cleaning:
- self.del_calls.append(id(self))
- self.check_sanity()
- self.side_effect()
- except Exception as e:
- self.errors.append(e)
- def side_effect(self):
- """
- A side effect called on destruction.
- """
- class SimpleBase(NonGCSimpleBase):
- def __init__(self):
- self.id_ = id(self)
- def check_sanity(self):
- assert self.id_ == id(self)
- @without_gc
- class NonGC(NonGCSimpleBase):
- __slots__ = ()
- @without_gc
- class NonGCResurrector(NonGCSimpleBase):
- __slots__ = ()
- def side_effect(self):
- """
- Resurrect self by storing self in a class-wide list.
- """
- self.survivors.append(self)
- class Simple(SimpleBase):
- pass
- # Can't inherit from NonGCResurrector, in case importing without_gc fails.
- class SimpleResurrector(SimpleBase):
- def side_effect(self):
- """
- Resurrect self by storing self in a class-wide list.
- """
- self.survivors.append(self)
- class TestBase:
- def setUp(self):
- self.old_garbage = gc.garbage[:]
- gc.garbage[:] = []
- def tearDown(self):
- # None of the tests here should put anything in gc.garbage
- try:
- self.assertEqual(gc.garbage, [])
- finally:
- del self.old_garbage
- gc.collect()
- def assert_del_calls(self, ids):
- self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
- def assert_tp_del_calls(self, ids):
- self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
- def assert_survivors(self, ids):
- self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
- def assert_garbage(self, ids):
- self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
- def clear_survivors(self):
- SimpleBase.survivors.clear()
- class SimpleFinalizationTest(TestBase, unittest.TestCase):
- """
- Test finalization without refcycles.
- """
- def test_simple(self):
- with SimpleBase.test():
- s = Simple()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- def test_simple_resurrect(self):
- with SimpleBase.test():
- s = SimpleResurrector()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors(ids)
- self.assertIsNot(wr(), None)
- self.clear_survivors()
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- @support.cpython_only
- def test_non_gc(self):
- with SimpleBase.test():
- s = NonGC()
- self.assertFalse(gc.is_tracked(s))
- ids = [id(s)]
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- @support.cpython_only
- def test_non_gc_resurrect(self):
- with SimpleBase.test():
- s = NonGCResurrector()
- self.assertFalse(gc.is_tracked(s))
- ids = [id(s)]
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors(ids)
- self.clear_survivors()
- gc.collect()
- self.assert_del_calls(ids * 2)
- self.assert_survivors(ids)
- class SelfCycleBase:
- def __init__(self):
- super().__init__()
- self.ref = self
- def check_sanity(self):
- super().check_sanity()
- assert self.ref is self
- class SimpleSelfCycle(SelfCycleBase, Simple):
- pass
- class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
- pass
- class SuicidalSelfCycle(SelfCycleBase, Simple):
- def side_effect(self):
- """
- Explicitly break the reference cycle.
- """
- self.ref = None
- class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
- """
- Test finalization of an object having a single cyclic reference to
- itself.
- """
- def test_simple(self):
- with SimpleBase.test():
- s = SimpleSelfCycle()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- def test_simple_resurrect(self):
- # Test that __del__ can resurrect the object being finalized.
- with SimpleBase.test():
- s = SelfCycleResurrector()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors(ids)
- # XXX is this desirable?
- self.assertIs(wr(), None)
- # When trying to destroy the object a second time, __del__
- # isn't called anymore (and the object isn't resurrected).
- self.clear_survivors()
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- def test_simple_suicide(self):
- # Test the GC is able to deal with an object that kills its last
- # reference during __del__.
- with SimpleBase.test():
- s = SuicidalSelfCycle()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- class ChainedBase:
- def chain(self, left):
- self.suicided = False
- self.left = left
- left.right = self
- def check_sanity(self):
- super().check_sanity()
- if self.suicided:
- assert self.left is None
- assert self.right is None
- else:
- left = self.left
- if left.suicided:
- assert left.right is None
- else:
- assert left.right is self
- right = self.right
- if right.suicided:
- assert right.left is None
- else:
- assert right.left is self
- class SimpleChained(ChainedBase, Simple):
- pass
- class ChainedResurrector(ChainedBase, SimpleResurrector):
- pass
- class SuicidalChained(ChainedBase, Simple):
- def side_effect(self):
- """
- Explicitly break the reference cycle.
- """
- self.suicided = True
- self.left = None
- self.right = None
- class CycleChainFinalizationTest(TestBase, unittest.TestCase):
- """
- Test finalization of a cyclic chain. These tests are similar in
- spirit to the self-cycle tests above, but the collectable object
- graph isn't trivial anymore.
- """
- def build_chain(self, classes):
- nodes = [cls() for cls in classes]
- for i in range(len(nodes)):
- nodes[i].chain(nodes[i-1])
- return nodes
- def check_non_resurrecting_chain(self, classes):
- N = len(classes)
- with SimpleBase.test():
- nodes = self.build_chain(classes)
- ids = [id(s) for s in nodes]
- wrs = [weakref.ref(s) for s in nodes]
- del nodes
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- self.assertEqual([wr() for wr in wrs], [None] * N)
- gc.collect()
- self.assert_del_calls(ids)
- def check_resurrecting_chain(self, classes):
- N = len(classes)
- with SimpleBase.test():
- nodes = self.build_chain(classes)
- N = len(nodes)
- ids = [id(s) for s in nodes]
- survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
- wrs = [weakref.ref(s) for s in nodes]
- del nodes
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors(survivor_ids)
- # XXX desirable?
- self.assertEqual([wr() for wr in wrs], [None] * N)
- self.clear_survivors()
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_survivors([])
- def test_homogenous(self):
- self.check_non_resurrecting_chain([SimpleChained] * 3)
- def test_homogenous_resurrect(self):
- self.check_resurrecting_chain([ChainedResurrector] * 3)
- def test_homogenous_suicidal(self):
- self.check_non_resurrecting_chain([SuicidalChained] * 3)
- def test_heterogenous_suicidal_one(self):
- self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
- def test_heterogenous_suicidal_two(self):
- self.check_non_resurrecting_chain(
- [SuicidalChained] * 2 + [SimpleChained] * 2)
- def test_heterogenous_resurrect_one(self):
- self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
- def test_heterogenous_resurrect_two(self):
- self.check_resurrecting_chain(
- [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
- def test_heterogenous_resurrect_three(self):
- self.check_resurrecting_chain(
- [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
- # NOTE: the tp_del slot isn't automatically inherited, so we have to call
- # with_tp_del() for each instantiated class.
- class LegacyBase(SimpleBase):
- def __del__(self):
- try:
- # Do not invoke side_effect here, since we are now exercising
- # the tp_del slot.
- if not self._cleaning:
- self.del_calls.append(id(self))
- self.check_sanity()
- except Exception as e:
- self.errors.append(e)
- def __tp_del__(self):
- """
- Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
- """
- try:
- if not self._cleaning:
- self.tp_del_calls.append(id(self))
- self.check_sanity()
- self.side_effect()
- except Exception as e:
- self.errors.append(e)
- @with_tp_del
- class Legacy(LegacyBase):
- pass
- @with_tp_del
- class LegacyResurrector(LegacyBase):
- def side_effect(self):
- """
- Resurrect self by storing self in a class-wide list.
- """
- self.survivors.append(self)
- @with_tp_del
- class LegacySelfCycle(SelfCycleBase, LegacyBase):
- pass
- @support.cpython_only
- class LegacyFinalizationTest(TestBase, unittest.TestCase):
- """
- Test finalization of objects with a tp_del.
- """
- def tearDown(self):
- # These tests need to clean up a bit more, since they create
- # uncollectable objects.
- gc.garbage.clear()
- gc.collect()
- super().tearDown()
- def test_legacy(self):
- with SimpleBase.test():
- s = Legacy()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_tp_del_calls(ids)
- self.assert_survivors([])
- self.assertIs(wr(), None)
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_tp_del_calls(ids)
- def test_legacy_resurrect(self):
- with SimpleBase.test():
- s = LegacyResurrector()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_tp_del_calls(ids)
- self.assert_survivors(ids)
- # weakrefs are cleared before tp_del is called.
- self.assertIs(wr(), None)
- self.clear_survivors()
- gc.collect()
- self.assert_del_calls(ids)
- self.assert_tp_del_calls(ids * 2)
- self.assert_survivors(ids)
- self.assertIs(wr(), None)
- def test_legacy_self_cycle(self):
- # Self-cycles with legacy finalizers end up in gc.garbage.
- with SimpleBase.test():
- s = LegacySelfCycle()
- ids = [id(s)]
- wr = weakref.ref(s)
- del s
- gc.collect()
- self.assert_del_calls([])
- self.assert_tp_del_calls([])
- self.assert_survivors([])
- self.assert_garbage(ids)
- self.assertIsNot(wr(), None)
- # Break the cycle to allow collection
- gc.garbage[0].ref = None
- self.assert_garbage([])
- self.assertIs(wr(), None)
- if __name__ == "__main__":
- unittest.main()
|