test_threadsignals.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """PyUnit testing that threads honor our signal semantics"""
  2. import unittest
  3. import signal
  4. import os
  5. import sys
  6. from test.support import threading_helper
  7. import _thread as thread
  8. import time
  9. if (sys.platform[:3] == 'win'):
  10. raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
  11. process_pid = os.getpid()
  12. signalled_all=thread.allocate_lock()
  13. USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
  14. and sys.thread_info.lock == 'mutex+cond')
  15. def registerSignals(for_usr1, for_usr2, for_alrm):
  16. usr1 = signal.signal(signal.SIGUSR1, for_usr1)
  17. usr2 = signal.signal(signal.SIGUSR2, for_usr2)
  18. alrm = signal.signal(signal.SIGALRM, for_alrm)
  19. return usr1, usr2, alrm
  20. # The signal handler. Just note that the signal occurred and
  21. # from who.
  22. def handle_signals(sig,frame):
  23. signal_blackboard[sig]['tripped'] += 1
  24. signal_blackboard[sig]['tripped_by'] = thread.get_ident()
  25. # a function that will be spawned as a separate thread.
  26. def send_signals():
  27. os.kill(process_pid, signal.SIGUSR1)
  28. os.kill(process_pid, signal.SIGUSR2)
  29. signalled_all.release()
  30. @threading_helper.requires_working_threading()
  31. @unittest.skipUnless(hasattr(signal, "alarm"), "test requires signal.alarm")
  32. class ThreadSignals(unittest.TestCase):
  33. def test_signals(self):
  34. with threading_helper.wait_threads_exit():
  35. # Test signal handling semantics of threads.
  36. # We spawn a thread, have the thread send two signals, and
  37. # wait for it to finish. Check that we got both signals
  38. # and that they were run by the main thread.
  39. signalled_all.acquire()
  40. self.spawnSignallingThread()
  41. signalled_all.acquire()
  42. # the signals that we asked the kernel to send
  43. # will come back, but we don't know when.
  44. # (it might even be after the thread exits
  45. # and might be out of order.) If we haven't seen
  46. # the signals yet, send yet another signal and
  47. # wait for it return.
  48. if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
  49. or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
  50. try:
  51. signal.alarm(1)
  52. signal.pause()
  53. finally:
  54. signal.alarm(0)
  55. self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
  56. self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
  57. thread.get_ident())
  58. self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
  59. self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
  60. thread.get_ident())
  61. signalled_all.release()
  62. def spawnSignallingThread(self):
  63. thread.start_new_thread(send_signals, ())
  64. def alarm_interrupt(self, sig, frame):
  65. raise KeyboardInterrupt
  66. @unittest.skipIf(USING_PTHREAD_COND,
  67. 'POSIX condition variables cannot be interrupted')
  68. @unittest.skipIf(sys.platform.startswith('linux') and
  69. not sys.thread_info.version,
  70. 'Issue 34004: musl does not allow interruption of locks '
  71. 'by signals.')
  72. # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
  73. @unittest.skipIf(sys.platform.startswith('openbsd'),
  74. 'lock cannot be interrupted on OpenBSD')
  75. def test_lock_acquire_interruption(self):
  76. # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
  77. # in a deadlock.
  78. # XXX this test can fail when the legacy (non-semaphore) implementation
  79. # of locks is used in thread_pthread.h, see issue #11223.
  80. oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
  81. try:
  82. lock = thread.allocate_lock()
  83. lock.acquire()
  84. signal.alarm(1)
  85. t1 = time.monotonic()
  86. self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
  87. dt = time.monotonic() - t1
  88. # Checking that KeyboardInterrupt was raised is not sufficient.
  89. # We want to assert that lock.acquire() was interrupted because
  90. # of the signal, not that the signal handler was called immediately
  91. # after timeout return of lock.acquire() (which can fool assertRaises).
  92. self.assertLess(dt, 3.0)
  93. finally:
  94. signal.alarm(0)
  95. signal.signal(signal.SIGALRM, oldalrm)
  96. @unittest.skipIf(USING_PTHREAD_COND,
  97. 'POSIX condition variables cannot be interrupted')
  98. @unittest.skipIf(sys.platform.startswith('linux') and
  99. not sys.thread_info.version,
  100. 'Issue 34004: musl does not allow interruption of locks '
  101. 'by signals.')
  102. # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
  103. @unittest.skipIf(sys.platform.startswith('openbsd'),
  104. 'lock cannot be interrupted on OpenBSD')
  105. def test_rlock_acquire_interruption(self):
  106. # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
  107. # in a deadlock.
  108. # XXX this test can fail when the legacy (non-semaphore) implementation
  109. # of locks is used in thread_pthread.h, see issue #11223.
  110. oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
  111. try:
  112. rlock = thread.RLock()
  113. # For reentrant locks, the initial acquisition must be in another
  114. # thread.
  115. def other_thread():
  116. rlock.acquire()
  117. with threading_helper.wait_threads_exit():
  118. thread.start_new_thread(other_thread, ())
  119. # Wait until we can't acquire it without blocking...
  120. while rlock.acquire(blocking=False):
  121. rlock.release()
  122. time.sleep(0.01)
  123. signal.alarm(1)
  124. t1 = time.monotonic()
  125. self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
  126. dt = time.monotonic() - t1
  127. # See rationale above in test_lock_acquire_interruption
  128. self.assertLess(dt, 3.0)
  129. finally:
  130. signal.alarm(0)
  131. signal.signal(signal.SIGALRM, oldalrm)
  132. def acquire_retries_on_intr(self, lock):
  133. self.sig_recvd = False
  134. def my_handler(signal, frame):
  135. self.sig_recvd = True
  136. old_handler = signal.signal(signal.SIGUSR1, my_handler)
  137. try:
  138. def other_thread():
  139. # Acquire the lock in a non-main thread, so this test works for
  140. # RLocks.
  141. lock.acquire()
  142. # Wait until the main thread is blocked in the lock acquire, and
  143. # then wake it up with this.
  144. time.sleep(0.5)
  145. os.kill(process_pid, signal.SIGUSR1)
  146. # Let the main thread take the interrupt, handle it, and retry
  147. # the lock acquisition. Then we'll let it run.
  148. time.sleep(0.5)
  149. lock.release()
  150. with threading_helper.wait_threads_exit():
  151. thread.start_new_thread(other_thread, ())
  152. # Wait until we can't acquire it without blocking...
  153. while lock.acquire(blocking=False):
  154. lock.release()
  155. time.sleep(0.01)
  156. result = lock.acquire() # Block while we receive a signal.
  157. self.assertTrue(self.sig_recvd)
  158. self.assertTrue(result)
  159. finally:
  160. signal.signal(signal.SIGUSR1, old_handler)
  161. def test_lock_acquire_retries_on_intr(self):
  162. self.acquire_retries_on_intr(thread.allocate_lock())
  163. def test_rlock_acquire_retries_on_intr(self):
  164. self.acquire_retries_on_intr(thread.RLock())
  165. def test_interrupted_timed_acquire(self):
  166. # Test to make sure we recompute lock acquisition timeouts when we
  167. # receive a signal. Check this by repeatedly interrupting a lock
  168. # acquire in the main thread, and make sure that the lock acquire times
  169. # out after the right amount of time.
  170. # NOTE: this test only behaves as expected if C signals get delivered
  171. # to the main thread. Otherwise lock.acquire() itself doesn't get
  172. # interrupted and the test trivially succeeds.
  173. self.start = None
  174. self.end = None
  175. self.sigs_recvd = 0
  176. done = thread.allocate_lock()
  177. done.acquire()
  178. lock = thread.allocate_lock()
  179. lock.acquire()
  180. def my_handler(signum, frame):
  181. self.sigs_recvd += 1
  182. old_handler = signal.signal(signal.SIGUSR1, my_handler)
  183. try:
  184. def timed_acquire():
  185. self.start = time.monotonic()
  186. lock.acquire(timeout=0.5)
  187. self.end = time.monotonic()
  188. def send_signals():
  189. for _ in range(40):
  190. time.sleep(0.02)
  191. os.kill(process_pid, signal.SIGUSR1)
  192. done.release()
  193. with threading_helper.wait_threads_exit():
  194. # Send the signals from the non-main thread, since the main thread
  195. # is the only one that can process signals.
  196. thread.start_new_thread(send_signals, ())
  197. timed_acquire()
  198. # Wait for thread to finish
  199. done.acquire()
  200. # This allows for some timing and scheduling imprecision
  201. self.assertLess(self.end - self.start, 2.0)
  202. self.assertGreater(self.end - self.start, 0.3)
  203. # If the signal is received several times before PyErr_CheckSignals()
  204. # is called, the handler will get called less than 40 times. Just
  205. # check it's been called at least once.
  206. self.assertGreater(self.sigs_recvd, 0)
  207. finally:
  208. signal.signal(signal.SIGUSR1, old_handler)
  209. def setUpModule():
  210. global signal_blackboard
  211. signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
  212. signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
  213. signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
  214. oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
  215. unittest.addModuleCleanup(registerSignals, *oldsigs)
  216. if __name__ == '__main__':
  217. unittest.main()