test_break.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import gc
  2. import io
  3. import os
  4. import sys
  5. import signal
  6. import weakref
  7. import unittest
  8. from test import support
  9. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  10. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  11. class TestBreak(unittest.TestCase):
  12. int_handler = None
  13. # This number was smart-guessed, previously tests were failing
  14. # after 7th run. So, we take `x * 2 + 1` to be sure.
  15. default_repeats = 15
  16. def setUp(self):
  17. self._default_handler = signal.getsignal(signal.SIGINT)
  18. if self.int_handler is not None:
  19. signal.signal(signal.SIGINT, self.int_handler)
  20. def tearDown(self):
  21. signal.signal(signal.SIGINT, self._default_handler)
  22. unittest.signals._results = weakref.WeakKeyDictionary()
  23. unittest.signals._interrupt_handler = None
  24. def withRepeats(self, test_function, repeats=None):
  25. if not support.check_impl_detail(cpython=True):
  26. # Override repeats count on non-cpython to execute only once.
  27. # Because this test only makes sense to be repeated on CPython.
  28. repeats = 1
  29. elif repeats is None:
  30. repeats = self.default_repeats
  31. for repeat in range(repeats):
  32. with self.subTest(repeat=repeat):
  33. # We don't run `setUp` for the very first repeat
  34. # and we don't run `tearDown` for the very last one,
  35. # because they are handled by the test class itself.
  36. if repeat != 0:
  37. self.setUp()
  38. try:
  39. test_function()
  40. finally:
  41. if repeat != repeats - 1:
  42. self.tearDown()
  43. def testInstallHandler(self):
  44. default_handler = signal.getsignal(signal.SIGINT)
  45. unittest.installHandler()
  46. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  47. try:
  48. pid = os.getpid()
  49. os.kill(pid, signal.SIGINT)
  50. except KeyboardInterrupt:
  51. self.fail("KeyboardInterrupt not handled")
  52. self.assertTrue(unittest.signals._interrupt_handler.called)
  53. def testRegisterResult(self):
  54. result = unittest.TestResult()
  55. self.assertNotIn(result, unittest.signals._results)
  56. unittest.registerResult(result)
  57. try:
  58. self.assertIn(result, unittest.signals._results)
  59. finally:
  60. unittest.removeResult(result)
  61. def testInterruptCaught(self):
  62. def test(result):
  63. pid = os.getpid()
  64. os.kill(pid, signal.SIGINT)
  65. result.breakCaught = True
  66. self.assertTrue(result.shouldStop)
  67. def test_function():
  68. result = unittest.TestResult()
  69. unittest.installHandler()
  70. unittest.registerResult(result)
  71. self.assertNotEqual(
  72. signal.getsignal(signal.SIGINT),
  73. self._default_handler,
  74. )
  75. try:
  76. test(result)
  77. except KeyboardInterrupt:
  78. self.fail("KeyboardInterrupt not handled")
  79. self.assertTrue(result.breakCaught)
  80. self.withRepeats(test_function)
  81. def testSecondInterrupt(self):
  82. # Can't use skipIf decorator because the signal handler may have
  83. # been changed after defining this method.
  84. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
  85. self.skipTest("test requires SIGINT to not be ignored")
  86. def test(result):
  87. pid = os.getpid()
  88. os.kill(pid, signal.SIGINT)
  89. result.breakCaught = True
  90. self.assertTrue(result.shouldStop)
  91. os.kill(pid, signal.SIGINT)
  92. self.fail("Second KeyboardInterrupt not raised")
  93. def test_function():
  94. result = unittest.TestResult()
  95. unittest.installHandler()
  96. unittest.registerResult(result)
  97. with self.assertRaises(KeyboardInterrupt):
  98. test(result)
  99. self.assertTrue(result.breakCaught)
  100. self.withRepeats(test_function)
  101. def testTwoResults(self):
  102. def test_function():
  103. unittest.installHandler()
  104. result = unittest.TestResult()
  105. unittest.registerResult(result)
  106. new_handler = signal.getsignal(signal.SIGINT)
  107. result2 = unittest.TestResult()
  108. unittest.registerResult(result2)
  109. self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
  110. result3 = unittest.TestResult()
  111. try:
  112. os.kill(os.getpid(), signal.SIGINT)
  113. except KeyboardInterrupt:
  114. self.fail("KeyboardInterrupt not handled")
  115. self.assertTrue(result.shouldStop)
  116. self.assertTrue(result2.shouldStop)
  117. self.assertFalse(result3.shouldStop)
  118. self.withRepeats(test_function)
  119. def testHandlerReplacedButCalled(self):
  120. # Can't use skipIf decorator because the signal handler may have
  121. # been changed after defining this method.
  122. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
  123. self.skipTest("test requires SIGINT to not be ignored")
  124. def test_function():
  125. # If our handler has been replaced (is no longer installed) but is
  126. # called by the *new* handler, then it isn't safe to delay the
  127. # SIGINT and we should immediately delegate to the default handler
  128. unittest.installHandler()
  129. handler = signal.getsignal(signal.SIGINT)
  130. def new_handler(frame, signum):
  131. handler(frame, signum)
  132. signal.signal(signal.SIGINT, new_handler)
  133. try:
  134. os.kill(os.getpid(), signal.SIGINT)
  135. except KeyboardInterrupt:
  136. pass
  137. else:
  138. self.fail("replaced but delegated handler doesn't raise interrupt")
  139. self.withRepeats(test_function)
  140. def testRunner(self):
  141. # Creating a TextTestRunner with the appropriate argument should
  142. # register the TextTestResult it creates
  143. runner = unittest.TextTestRunner(stream=io.StringIO())
  144. result = runner.run(unittest.TestSuite())
  145. self.assertIn(result, unittest.signals._results)
  146. def testWeakReferences(self):
  147. # Calling registerResult on a result should not keep it alive
  148. result = unittest.TestResult()
  149. unittest.registerResult(result)
  150. ref = weakref.ref(result)
  151. del result
  152. # For non-reference counting implementations
  153. gc.collect();gc.collect()
  154. self.assertIsNone(ref())
  155. def testRemoveResult(self):
  156. result = unittest.TestResult()
  157. unittest.registerResult(result)
  158. unittest.installHandler()
  159. self.assertTrue(unittest.removeResult(result))
  160. # Should this raise an error instead?
  161. self.assertFalse(unittest.removeResult(unittest.TestResult()))
  162. try:
  163. pid = os.getpid()
  164. os.kill(pid, signal.SIGINT)
  165. except KeyboardInterrupt:
  166. pass
  167. self.assertFalse(result.shouldStop)
  168. def testMainInstallsHandler(self):
  169. failfast = object()
  170. test = object()
  171. verbosity = object()
  172. result = object()
  173. default_handler = signal.getsignal(signal.SIGINT)
  174. class FakeRunner(object):
  175. initArgs = []
  176. runArgs = []
  177. def __init__(self, *args, **kwargs):
  178. self.initArgs.append((args, kwargs))
  179. def run(self, test):
  180. self.runArgs.append(test)
  181. return result
  182. class Program(unittest.TestProgram):
  183. def __init__(self, catchbreak):
  184. self.exit = False
  185. self.verbosity = verbosity
  186. self.failfast = failfast
  187. self.catchbreak = catchbreak
  188. self.tb_locals = False
  189. self.testRunner = FakeRunner
  190. self.test = test
  191. self.result = None
  192. p = Program(False)
  193. p.runTests()
  194. self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
  195. 'verbosity': verbosity,
  196. 'failfast': failfast,
  197. 'tb_locals': False,
  198. 'warnings': None})])
  199. self.assertEqual(FakeRunner.runArgs, [test])
  200. self.assertEqual(p.result, result)
  201. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  202. FakeRunner.initArgs = []
  203. FakeRunner.runArgs = []
  204. p = Program(True)
  205. p.runTests()
  206. self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
  207. 'verbosity': verbosity,
  208. 'failfast': failfast,
  209. 'tb_locals': False,
  210. 'warnings': None})])
  211. self.assertEqual(FakeRunner.runArgs, [test])
  212. self.assertEqual(p.result, result)
  213. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  214. def testRemoveHandler(self):
  215. default_handler = signal.getsignal(signal.SIGINT)
  216. unittest.installHandler()
  217. unittest.removeHandler()
  218. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  219. # check that calling removeHandler multiple times has no ill-effect
  220. unittest.removeHandler()
  221. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  222. def testRemoveHandlerAsDecorator(self):
  223. default_handler = signal.getsignal(signal.SIGINT)
  224. unittest.installHandler()
  225. @unittest.removeHandler
  226. def test():
  227. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  228. test()
  229. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  230. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  231. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  232. class TestBreakDefaultIntHandler(TestBreak):
  233. int_handler = signal.default_int_handler
  234. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  235. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  236. class TestBreakSignalIgnored(TestBreak):
  237. int_handler = signal.SIG_IGN
  238. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  239. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  240. class TestBreakSignalDefault(TestBreak):
  241. int_handler = signal.SIG_DFL
  242. if __name__ == "__main__":
  243. unittest.main()