test_atexit.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import atexit
  2. import os
  3. import sys
  4. import textwrap
  5. import unittest
  6. from test import support
  7. from test.support import script_helper
  8. class GeneralTest(unittest.TestCase):
  9. def test_general(self):
  10. # Run _test_atexit.py in a subprocess since it calls atexit._clear()
  11. script = support.findfile("_test_atexit.py")
  12. script_helper.run_test_script(script)
  13. class FunctionalTest(unittest.TestCase):
  14. def test_shutdown(self):
  15. # Actually test the shutdown mechanism in a subprocess
  16. code = textwrap.dedent("""
  17. import atexit
  18. def f(msg):
  19. print(msg)
  20. atexit.register(f, "one")
  21. atexit.register(f, "two")
  22. """)
  23. res = script_helper.assert_python_ok("-c", code)
  24. self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
  25. self.assertFalse(res.err)
  26. def test_atexit_instances(self):
  27. # bpo-42639: It is safe to have more than one atexit instance.
  28. code = textwrap.dedent("""
  29. import sys
  30. import atexit as atexit1
  31. del sys.modules['atexit']
  32. import atexit as atexit2
  33. del sys.modules['atexit']
  34. assert atexit2 is not atexit1
  35. atexit1.register(print, "atexit1")
  36. atexit2.register(print, "atexit2")
  37. """)
  38. res = script_helper.assert_python_ok("-c", code)
  39. self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
  40. self.assertFalse(res.err)
  41. @support.cpython_only
  42. class SubinterpreterTest(unittest.TestCase):
  43. def test_callbacks_leak(self):
  44. # This test shows a leak in refleak mode if atexit doesn't
  45. # take care to free callbacks in its per-subinterpreter module
  46. # state.
  47. n = atexit._ncallbacks()
  48. code = textwrap.dedent(r"""
  49. import atexit
  50. def f():
  51. pass
  52. atexit.register(f)
  53. del atexit
  54. """)
  55. ret = support.run_in_subinterp(code)
  56. self.assertEqual(ret, 0)
  57. self.assertEqual(atexit._ncallbacks(), n)
  58. def test_callbacks_leak_refcycle(self):
  59. # Similar to the above, but with a refcycle through the atexit
  60. # module.
  61. n = atexit._ncallbacks()
  62. code = textwrap.dedent(r"""
  63. import atexit
  64. def f():
  65. pass
  66. atexit.register(f)
  67. atexit.__atexit = atexit
  68. """)
  69. ret = support.run_in_subinterp(code)
  70. self.assertEqual(ret, 0)
  71. self.assertEqual(atexit._ncallbacks(), n)
  72. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  73. def test_callback_on_subinterpreter_teardown(self):
  74. # This tests if a callback is called on
  75. # subinterpreter teardown.
  76. expected = b"The test has passed!"
  77. r, w = os.pipe()
  78. code = textwrap.dedent(r"""
  79. import os
  80. import atexit
  81. def callback():
  82. os.write({:d}, b"The test has passed!")
  83. atexit.register(callback)
  84. """.format(w))
  85. ret = support.run_in_subinterp(code)
  86. os.close(w)
  87. self.assertEqual(os.read(r, len(expected)), expected)
  88. os.close(r)
  89. if __name__ == "__main__":
  90. unittest.main()