test_signal.py 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442
  1. import enum
  2. import errno
  3. import inspect
  4. import os
  5. import random
  6. import signal
  7. import socket
  8. import statistics
  9. import subprocess
  10. import sys
  11. import threading
  12. import time
  13. import unittest
  14. from test import support
  15. from test.support import os_helper
  16. from test.support.script_helper import assert_python_ok, spawn_python
  17. from test.support import threading_helper
  18. try:
  19. import _testcapi
  20. except ImportError:
  21. _testcapi = None
  22. class GenericTests(unittest.TestCase):
  23. def test_enums(self):
  24. for name in dir(signal):
  25. sig = getattr(signal, name)
  26. if name in {'SIG_DFL', 'SIG_IGN'}:
  27. self.assertIsInstance(sig, signal.Handlers)
  28. elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
  29. self.assertIsInstance(sig, signal.Sigmasks)
  30. elif name.startswith('SIG') and not name.startswith('SIG_'):
  31. self.assertIsInstance(sig, signal.Signals)
  32. elif name.startswith('CTRL_'):
  33. self.assertIsInstance(sig, signal.Signals)
  34. self.assertEqual(sys.platform, "win32")
  35. CheckedSignals = enum._old_convert_(
  36. enum.IntEnum, 'Signals', 'signal',
  37. lambda name:
  38. name.isupper()
  39. and (name.startswith('SIG') and not name.startswith('SIG_'))
  40. or name.startswith('CTRL_'),
  41. source=signal,
  42. )
  43. enum._test_simple_enum(CheckedSignals, signal.Signals)
  44. CheckedHandlers = enum._old_convert_(
  45. enum.IntEnum, 'Handlers', 'signal',
  46. lambda name: name in ('SIG_DFL', 'SIG_IGN'),
  47. source=signal,
  48. )
  49. enum._test_simple_enum(CheckedHandlers, signal.Handlers)
  50. Sigmasks = getattr(signal, 'Sigmasks', None)
  51. if Sigmasks is not None:
  52. CheckedSigmasks = enum._old_convert_(
  53. enum.IntEnum, 'Sigmasks', 'signal',
  54. lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
  55. source=signal,
  56. )
  57. enum._test_simple_enum(CheckedSigmasks, Sigmasks)
  58. def test_functions_module_attr(self):
  59. # Issue #27718: If __all__ is not defined all non-builtin functions
  60. # should have correct __module__ to be displayed by pydoc.
  61. for name in dir(signal):
  62. value = getattr(signal, name)
  63. if inspect.isroutine(value) and not inspect.isbuiltin(value):
  64. self.assertEqual(value.__module__, 'signal')
  65. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  66. class PosixTests(unittest.TestCase):
  67. def trivial_signal_handler(self, *args):
  68. pass
  69. def test_out_of_range_signal_number_raises_error(self):
  70. self.assertRaises(ValueError, signal.getsignal, 4242)
  71. self.assertRaises(ValueError, signal.signal, 4242,
  72. self.trivial_signal_handler)
  73. self.assertRaises(ValueError, signal.strsignal, 4242)
  74. def test_setting_signal_handler_to_none_raises_error(self):
  75. self.assertRaises(TypeError, signal.signal,
  76. signal.SIGUSR1, None)
  77. def test_getsignal(self):
  78. hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
  79. self.assertIsInstance(hup, signal.Handlers)
  80. self.assertEqual(signal.getsignal(signal.SIGHUP),
  81. self.trivial_signal_handler)
  82. signal.signal(signal.SIGHUP, hup)
  83. self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
  84. def test_strsignal(self):
  85. self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
  86. self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
  87. self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
  88. # Issue 3864, unknown if this affects earlier versions of freebsd also
  89. def test_interprocess_signal(self):
  90. dirname = os.path.dirname(__file__)
  91. script = os.path.join(dirname, 'signalinterproctester.py')
  92. assert_python_ok(script)
  93. @unittest.skipUnless(
  94. hasattr(signal, "valid_signals"),
  95. "requires signal.valid_signals"
  96. )
  97. def test_valid_signals(self):
  98. s = signal.valid_signals()
  99. self.assertIsInstance(s, set)
  100. self.assertIn(signal.Signals.SIGINT, s)
  101. self.assertIn(signal.Signals.SIGALRM, s)
  102. self.assertNotIn(0, s)
  103. self.assertNotIn(signal.NSIG, s)
  104. self.assertLess(len(s), signal.NSIG)
  105. # gh-91145: Make sure that all SIGxxx constants exposed by the Python
  106. # signal module have a number in the [0; signal.NSIG-1] range.
  107. for name in dir(signal):
  108. if not name.startswith("SIG"):
  109. continue
  110. if name in {"SIG_IGN", "SIG_DFL"}:
  111. # SIG_IGN and SIG_DFL are pointers
  112. continue
  113. with self.subTest(name=name):
  114. signum = getattr(signal, name)
  115. self.assertGreaterEqual(signum, 0)
  116. self.assertLess(signum, signal.NSIG)
  117. @unittest.skipUnless(sys.executable, "sys.executable required.")
  118. @support.requires_subprocess()
  119. def test_keyboard_interrupt_exit_code(self):
  120. """KeyboardInterrupt triggers exit via SIGINT."""
  121. process = subprocess.run(
  122. [sys.executable, "-c",
  123. "import os, signal, time\n"
  124. "os.kill(os.getpid(), signal.SIGINT)\n"
  125. "for _ in range(999): time.sleep(0.01)"],
  126. stderr=subprocess.PIPE)
  127. self.assertIn(b"KeyboardInterrupt", process.stderr)
  128. self.assertEqual(process.returncode, -signal.SIGINT)
  129. # Caveat: The exit code is insufficient to guarantee we actually died
  130. # via a signal. POSIX shells do more than look at the 8 bit value.
  131. # Writing an automation friendly test of an interactive shell
  132. # to confirm that our process died via a SIGINT proved too complex.
  133. @unittest.skipUnless(sys.platform == "win32", "Windows specific")
  134. class WindowsSignalTests(unittest.TestCase):
  135. def test_valid_signals(self):
  136. s = signal.valid_signals()
  137. self.assertIsInstance(s, set)
  138. self.assertGreaterEqual(len(s), 6)
  139. self.assertIn(signal.Signals.SIGINT, s)
  140. self.assertNotIn(0, s)
  141. self.assertNotIn(signal.NSIG, s)
  142. self.assertLess(len(s), signal.NSIG)
  143. def test_issue9324(self):
  144. # Updated for issue #10003, adding SIGBREAK
  145. handler = lambda x, y: None
  146. checked = set()
  147. for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
  148. signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
  149. signal.SIGTERM):
  150. # Set and then reset a handler for signals that work on windows.
  151. # Issue #18396, only for signals without a C-level handler.
  152. if signal.getsignal(sig) is not None:
  153. signal.signal(sig, signal.signal(sig, handler))
  154. checked.add(sig)
  155. # Issue #18396: Ensure the above loop at least tested *something*
  156. self.assertTrue(checked)
  157. with self.assertRaises(ValueError):
  158. signal.signal(-1, handler)
  159. with self.assertRaises(ValueError):
  160. signal.signal(7, handler)
  161. @unittest.skipUnless(sys.executable, "sys.executable required.")
  162. @support.requires_subprocess()
  163. def test_keyboard_interrupt_exit_code(self):
  164. """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
  165. # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
  166. # as that requires setting up a console control handler in a child
  167. # in its own process group. Doable, but quite complicated. (see
  168. # @eryksun on https://github.com/python/cpython/pull/11862)
  169. process = subprocess.run(
  170. [sys.executable, "-c", "raise KeyboardInterrupt"],
  171. stderr=subprocess.PIPE)
  172. self.assertIn(b"KeyboardInterrupt", process.stderr)
  173. STATUS_CONTROL_C_EXIT = 0xC000013A
  174. self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
  175. class WakeupFDTests(unittest.TestCase):
  176. def test_invalid_call(self):
  177. # First parameter is positional-only
  178. with self.assertRaises(TypeError):
  179. signal.set_wakeup_fd(signum=signal.SIGINT)
  180. # warn_on_full_buffer is a keyword-only parameter
  181. with self.assertRaises(TypeError):
  182. signal.set_wakeup_fd(signal.SIGINT, False)
  183. def test_invalid_fd(self):
  184. fd = os_helper.make_bad_fd()
  185. self.assertRaises((ValueError, OSError),
  186. signal.set_wakeup_fd, fd)
  187. @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
  188. def test_invalid_socket(self):
  189. sock = socket.socket()
  190. fd = sock.fileno()
  191. sock.close()
  192. self.assertRaises((ValueError, OSError),
  193. signal.set_wakeup_fd, fd)
  194. # Emscripten does not support fstat on pipes yet.
  195. # https://github.com/emscripten-core/emscripten/issues/16414
  196. @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
  197. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  198. def test_set_wakeup_fd_result(self):
  199. r1, w1 = os.pipe()
  200. self.addCleanup(os.close, r1)
  201. self.addCleanup(os.close, w1)
  202. r2, w2 = os.pipe()
  203. self.addCleanup(os.close, r2)
  204. self.addCleanup(os.close, w2)
  205. if hasattr(os, 'set_blocking'):
  206. os.set_blocking(w1, False)
  207. os.set_blocking(w2, False)
  208. signal.set_wakeup_fd(w1)
  209. self.assertEqual(signal.set_wakeup_fd(w2), w1)
  210. self.assertEqual(signal.set_wakeup_fd(-1), w2)
  211. self.assertEqual(signal.set_wakeup_fd(-1), -1)
  212. @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
  213. @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
  214. def test_set_wakeup_fd_socket_result(self):
  215. sock1 = socket.socket()
  216. self.addCleanup(sock1.close)
  217. sock1.setblocking(False)
  218. fd1 = sock1.fileno()
  219. sock2 = socket.socket()
  220. self.addCleanup(sock2.close)
  221. sock2.setblocking(False)
  222. fd2 = sock2.fileno()
  223. signal.set_wakeup_fd(fd1)
  224. self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
  225. self.assertEqual(signal.set_wakeup_fd(-1), fd2)
  226. self.assertEqual(signal.set_wakeup_fd(-1), -1)
  227. # On Windows, files are always blocking and Windows does not provide a
  228. # function to test if a socket is in non-blocking mode.
  229. @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
  230. @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
  231. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  232. def test_set_wakeup_fd_blocking(self):
  233. rfd, wfd = os.pipe()
  234. self.addCleanup(os.close, rfd)
  235. self.addCleanup(os.close, wfd)
  236. # fd must be non-blocking
  237. os.set_blocking(wfd, True)
  238. with self.assertRaises(ValueError) as cm:
  239. signal.set_wakeup_fd(wfd)
  240. self.assertEqual(str(cm.exception),
  241. "the fd %s must be in non-blocking mode" % wfd)
  242. # non-blocking is ok
  243. os.set_blocking(wfd, False)
  244. signal.set_wakeup_fd(wfd)
  245. signal.set_wakeup_fd(-1)
  246. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  247. class WakeupSignalTests(unittest.TestCase):
  248. @unittest.skipIf(_testcapi is None, 'need _testcapi')
  249. def check_wakeup(self, test_body, *signals, ordered=True):
  250. # use a subprocess to have only one thread
  251. code = """if 1:
  252. import _testcapi
  253. import os
  254. import signal
  255. import struct
  256. signals = {!r}
  257. def handler(signum, frame):
  258. pass
  259. def check_signum(signals):
  260. data = os.read(read, len(signals)+1)
  261. raised = struct.unpack('%uB' % len(data), data)
  262. if not {!r}:
  263. raised = set(raised)
  264. signals = set(signals)
  265. if raised != signals:
  266. raise Exception("%r != %r" % (raised, signals))
  267. {}
  268. signal.signal(signal.SIGALRM, handler)
  269. read, write = os.pipe()
  270. os.set_blocking(write, False)
  271. signal.set_wakeup_fd(write)
  272. test()
  273. check_signum(signals)
  274. os.close(read)
  275. os.close(write)
  276. """.format(tuple(map(int, signals)), ordered, test_body)
  277. assert_python_ok('-c', code)
  278. @unittest.skipIf(_testcapi is None, 'need _testcapi')
  279. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  280. def test_wakeup_write_error(self):
  281. # Issue #16105: write() errors in the C signal handler should not
  282. # pass silently.
  283. # Use a subprocess to have only one thread.
  284. code = """if 1:
  285. import _testcapi
  286. import errno
  287. import os
  288. import signal
  289. import sys
  290. from test.support import captured_stderr
  291. def handler(signum, frame):
  292. 1/0
  293. signal.signal(signal.SIGALRM, handler)
  294. r, w = os.pipe()
  295. os.set_blocking(r, False)
  296. # Set wakeup_fd a read-only file descriptor to trigger the error
  297. signal.set_wakeup_fd(r)
  298. try:
  299. with captured_stderr() as err:
  300. signal.raise_signal(signal.SIGALRM)
  301. except ZeroDivisionError:
  302. # An ignored exception should have been printed out on stderr
  303. err = err.getvalue()
  304. if ('Exception ignored when trying to write to the signal wakeup fd'
  305. not in err):
  306. raise AssertionError(err)
  307. if ('OSError: [Errno %d]' % errno.EBADF) not in err:
  308. raise AssertionError(err)
  309. else:
  310. raise AssertionError("ZeroDivisionError not raised")
  311. os.close(r)
  312. os.close(w)
  313. """
  314. r, w = os.pipe()
  315. try:
  316. os.write(r, b'x')
  317. except OSError:
  318. pass
  319. else:
  320. self.skipTest("OS doesn't report write() error on the read end of a pipe")
  321. finally:
  322. os.close(r)
  323. os.close(w)
  324. assert_python_ok('-c', code)
  325. def test_wakeup_fd_early(self):
  326. self.check_wakeup("""def test():
  327. import select
  328. import time
  329. TIMEOUT_FULL = 10
  330. TIMEOUT_HALF = 5
  331. class InterruptSelect(Exception):
  332. pass
  333. def handler(signum, frame):
  334. raise InterruptSelect
  335. signal.signal(signal.SIGALRM, handler)
  336. signal.alarm(1)
  337. # We attempt to get a signal during the sleep,
  338. # before select is called
  339. try:
  340. select.select([], [], [], TIMEOUT_FULL)
  341. except InterruptSelect:
  342. pass
  343. else:
  344. raise Exception("select() was not interrupted")
  345. before_time = time.monotonic()
  346. select.select([read], [], [], TIMEOUT_FULL)
  347. after_time = time.monotonic()
  348. dt = after_time - before_time
  349. if dt >= TIMEOUT_HALF:
  350. raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
  351. """, signal.SIGALRM)
  352. def test_wakeup_fd_during(self):
  353. self.check_wakeup("""def test():
  354. import select
  355. import time
  356. TIMEOUT_FULL = 10
  357. TIMEOUT_HALF = 5
  358. class InterruptSelect(Exception):
  359. pass
  360. def handler(signum, frame):
  361. raise InterruptSelect
  362. signal.signal(signal.SIGALRM, handler)
  363. signal.alarm(1)
  364. before_time = time.monotonic()
  365. # We attempt to get a signal during the select call
  366. try:
  367. select.select([read], [], [], TIMEOUT_FULL)
  368. except InterruptSelect:
  369. pass
  370. else:
  371. raise Exception("select() was not interrupted")
  372. after_time = time.monotonic()
  373. dt = after_time - before_time
  374. if dt >= TIMEOUT_HALF:
  375. raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
  376. """, signal.SIGALRM)
  377. def test_signum(self):
  378. self.check_wakeup("""def test():
  379. signal.signal(signal.SIGUSR1, handler)
  380. signal.raise_signal(signal.SIGUSR1)
  381. signal.raise_signal(signal.SIGALRM)
  382. """, signal.SIGUSR1, signal.SIGALRM)
  383. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  384. 'need signal.pthread_sigmask()')
  385. def test_pending(self):
  386. self.check_wakeup("""def test():
  387. signum1 = signal.SIGUSR1
  388. signum2 = signal.SIGUSR2
  389. signal.signal(signum1, handler)
  390. signal.signal(signum2, handler)
  391. signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
  392. signal.raise_signal(signum1)
  393. signal.raise_signal(signum2)
  394. # Unblocking the 2 signals calls the C signal handler twice
  395. signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
  396. """, signal.SIGUSR1, signal.SIGUSR2, ordered=False)
  397. @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
  398. class WakeupSocketSignalTests(unittest.TestCase):
  399. @unittest.skipIf(_testcapi is None, 'need _testcapi')
  400. def test_socket(self):
  401. # use a subprocess to have only one thread
  402. code = """if 1:
  403. import signal
  404. import socket
  405. import struct
  406. import _testcapi
  407. signum = signal.SIGINT
  408. signals = (signum,)
  409. def handler(signum, frame):
  410. pass
  411. signal.signal(signum, handler)
  412. read, write = socket.socketpair()
  413. write.setblocking(False)
  414. signal.set_wakeup_fd(write.fileno())
  415. signal.raise_signal(signum)
  416. data = read.recv(1)
  417. if not data:
  418. raise Exception("no signum written")
  419. raised = struct.unpack('B', data)
  420. if raised != signals:
  421. raise Exception("%r != %r" % (raised, signals))
  422. read.close()
  423. write.close()
  424. """
  425. assert_python_ok('-c', code)
  426. @unittest.skipIf(_testcapi is None, 'need _testcapi')
  427. def test_send_error(self):
  428. # Use a subprocess to have only one thread.
  429. if os.name == 'nt':
  430. action = 'send'
  431. else:
  432. action = 'write'
  433. code = """if 1:
  434. import errno
  435. import signal
  436. import socket
  437. import sys
  438. import time
  439. import _testcapi
  440. from test.support import captured_stderr
  441. signum = signal.SIGINT
  442. def handler(signum, frame):
  443. pass
  444. signal.signal(signum, handler)
  445. read, write = socket.socketpair()
  446. read.setblocking(False)
  447. write.setblocking(False)
  448. signal.set_wakeup_fd(write.fileno())
  449. # Close sockets: send() will fail
  450. read.close()
  451. write.close()
  452. with captured_stderr() as err:
  453. signal.raise_signal(signum)
  454. err = err.getvalue()
  455. if ('Exception ignored when trying to {action} to the signal wakeup fd'
  456. not in err):
  457. raise AssertionError(err)
  458. """.format(action=action)
  459. assert_python_ok('-c', code)
  460. @unittest.skipIf(_testcapi is None, 'need _testcapi')
  461. def test_warn_on_full_buffer(self):
  462. # Use a subprocess to have only one thread.
  463. if os.name == 'nt':
  464. action = 'send'
  465. else:
  466. action = 'write'
  467. code = """if 1:
  468. import errno
  469. import signal
  470. import socket
  471. import sys
  472. import time
  473. import _testcapi
  474. from test.support import captured_stderr
  475. signum = signal.SIGINT
  476. # This handler will be called, but we intentionally won't read from
  477. # the wakeup fd.
  478. def handler(signum, frame):
  479. pass
  480. signal.signal(signum, handler)
  481. read, write = socket.socketpair()
  482. # Fill the socketpair buffer
  483. if sys.platform == 'win32':
  484. # bpo-34130: On Windows, sometimes non-blocking send fails to fill
  485. # the full socketpair buffer, so use a timeout of 50 ms instead.
  486. write.settimeout(0.050)
  487. else:
  488. write.setblocking(False)
  489. written = 0
  490. if sys.platform == "vxworks":
  491. CHUNK_SIZES = (1,)
  492. else:
  493. # Start with large chunk size to reduce the
  494. # number of send needed to fill the buffer.
  495. CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
  496. for chunk_size in CHUNK_SIZES:
  497. chunk = b"x" * chunk_size
  498. try:
  499. while True:
  500. write.send(chunk)
  501. written += chunk_size
  502. except (BlockingIOError, TimeoutError):
  503. pass
  504. print(f"%s bytes written into the socketpair" % written, flush=True)
  505. write.setblocking(False)
  506. try:
  507. write.send(b"x")
  508. except BlockingIOError:
  509. # The socketpair buffer seems full
  510. pass
  511. else:
  512. raise AssertionError("%s bytes failed to fill the socketpair "
  513. "buffer" % written)
  514. # By default, we get a warning when a signal arrives
  515. msg = ('Exception ignored when trying to {action} '
  516. 'to the signal wakeup fd')
  517. signal.set_wakeup_fd(write.fileno())
  518. with captured_stderr() as err:
  519. signal.raise_signal(signum)
  520. err = err.getvalue()
  521. if msg not in err:
  522. raise AssertionError("first set_wakeup_fd() test failed, "
  523. "stderr: %r" % err)
  524. # And also if warn_on_full_buffer=True
  525. signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
  526. with captured_stderr() as err:
  527. signal.raise_signal(signum)
  528. err = err.getvalue()
  529. if msg not in err:
  530. raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
  531. "test failed, stderr: %r" % err)
  532. # But not if warn_on_full_buffer=False
  533. signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
  534. with captured_stderr() as err:
  535. signal.raise_signal(signum)
  536. err = err.getvalue()
  537. if err != "":
  538. raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
  539. "test failed, stderr: %r" % err)
  540. # And then check the default again, to make sure warn_on_full_buffer
  541. # settings don't leak across calls.
  542. signal.set_wakeup_fd(write.fileno())
  543. with captured_stderr() as err:
  544. signal.raise_signal(signum)
  545. err = err.getvalue()
  546. if msg not in err:
  547. raise AssertionError("second set_wakeup_fd() test failed, "
  548. "stderr: %r" % err)
  549. """.format(action=action)
  550. assert_python_ok('-c', code)
  551. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  552. @unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
  553. @support.requires_subprocess()
  554. @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
  555. class SiginterruptTest(unittest.TestCase):
  556. def readpipe_interrupted(self, interrupt):
  557. """Perform a read during which a signal will arrive. Return True if the
  558. read is interrupted by the signal and raises an exception. Return False
  559. if it returns normally.
  560. """
  561. # use a subprocess to have only one thread, to have a timeout on the
  562. # blocking read and to not touch signal handling in this process
  563. code = """if 1:
  564. import errno
  565. import os
  566. import signal
  567. import sys
  568. interrupt = %r
  569. r, w = os.pipe()
  570. def handler(signum, frame):
  571. 1 / 0
  572. signal.signal(signal.SIGALRM, handler)
  573. if interrupt is not None:
  574. signal.siginterrupt(signal.SIGALRM, interrupt)
  575. print("ready")
  576. sys.stdout.flush()
  577. # run the test twice
  578. try:
  579. for loop in range(2):
  580. # send a SIGALRM in a second (during the read)
  581. signal.alarm(1)
  582. try:
  583. # blocking call: read from a pipe without data
  584. os.read(r, 1)
  585. except ZeroDivisionError:
  586. pass
  587. else:
  588. sys.exit(2)
  589. sys.exit(3)
  590. finally:
  591. os.close(r)
  592. os.close(w)
  593. """ % (interrupt,)
  594. with spawn_python('-c', code) as process:
  595. try:
  596. # wait until the child process is loaded and has started
  597. first_line = process.stdout.readline()
  598. stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
  599. except subprocess.TimeoutExpired:
  600. process.kill()
  601. return False
  602. else:
  603. stdout = first_line + stdout
  604. exitcode = process.wait()
  605. if exitcode not in (2, 3):
  606. raise Exception("Child error (exit code %s): %r"
  607. % (exitcode, stdout))
  608. return (exitcode == 3)
  609. def test_without_siginterrupt(self):
  610. # If a signal handler is installed and siginterrupt is not called
  611. # at all, when that signal arrives, it interrupts a syscall that's in
  612. # progress.
  613. interrupted = self.readpipe_interrupted(None)
  614. self.assertTrue(interrupted)
  615. def test_siginterrupt_on(self):
  616. # If a signal handler is installed and siginterrupt is called with
  617. # a true value for the second argument, when that signal arrives, it
  618. # interrupts a syscall that's in progress.
  619. interrupted = self.readpipe_interrupted(True)
  620. self.assertTrue(interrupted)
  621. def test_siginterrupt_off(self):
  622. # If a signal handler is installed and siginterrupt is called with
  623. # a false value for the second argument, when that signal arrives, it
  624. # does not interrupt a syscall that's in progress.
  625. interrupted = self.readpipe_interrupted(False)
  626. self.assertFalse(interrupted)
  627. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  628. @unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
  629. "needs signal.getitimer() and signal.setitimer()")
  630. class ItimerTest(unittest.TestCase):
  631. def setUp(self):
  632. self.hndl_called = False
  633. self.hndl_count = 0
  634. self.itimer = None
  635. self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
  636. def tearDown(self):
  637. signal.signal(signal.SIGALRM, self.old_alarm)
  638. if self.itimer is not None: # test_itimer_exc doesn't change this attr
  639. # just ensure that itimer is stopped
  640. signal.setitimer(self.itimer, 0)
  641. def sig_alrm(self, *args):
  642. self.hndl_called = True
  643. def sig_vtalrm(self, *args):
  644. self.hndl_called = True
  645. if self.hndl_count > 3:
  646. # it shouldn't be here, because it should have been disabled.
  647. raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
  648. "timer.")
  649. elif self.hndl_count == 3:
  650. # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
  651. signal.setitimer(signal.ITIMER_VIRTUAL, 0)
  652. self.hndl_count += 1
  653. def sig_prof(self, *args):
  654. self.hndl_called = True
  655. signal.setitimer(signal.ITIMER_PROF, 0)
  656. def test_itimer_exc(self):
  657. # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
  658. # defines it ?
  659. self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
  660. # Negative times are treated as zero on some platforms.
  661. if 0:
  662. self.assertRaises(signal.ItimerError,
  663. signal.setitimer, signal.ITIMER_REAL, -1)
  664. def test_itimer_real(self):
  665. self.itimer = signal.ITIMER_REAL
  666. signal.setitimer(self.itimer, 1.0)
  667. signal.pause()
  668. self.assertEqual(self.hndl_called, True)
  669. # Issue 3864, unknown if this affects earlier versions of freebsd also
  670. @unittest.skipIf(sys.platform in ('netbsd5',),
  671. 'itimer not reliable (does not mix well with threading) on some BSDs.')
  672. def test_itimer_virtual(self):
  673. self.itimer = signal.ITIMER_VIRTUAL
  674. signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
  675. signal.setitimer(self.itimer, 0.3, 0.2)
  676. start_time = time.monotonic()
  677. while time.monotonic() - start_time < 60.0:
  678. # use up some virtual time by doing real work
  679. _ = pow(12345, 67890, 10000019)
  680. if signal.getitimer(self.itimer) == (0.0, 0.0):
  681. break # sig_vtalrm handler stopped this itimer
  682. else: # Issue 8424
  683. self.skipTest("timeout: likely cause: machine too slow or load too "
  684. "high")
  685. # virtual itimer should be (0.0, 0.0) now
  686. self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
  687. # and the handler should have been called
  688. self.assertEqual(self.hndl_called, True)
  689. def test_itimer_prof(self):
  690. self.itimer = signal.ITIMER_PROF
  691. signal.signal(signal.SIGPROF, self.sig_prof)
  692. signal.setitimer(self.itimer, 0.2, 0.2)
  693. start_time = time.monotonic()
  694. while time.monotonic() - start_time < 60.0:
  695. # do some work
  696. _ = pow(12345, 67890, 10000019)
  697. if signal.getitimer(self.itimer) == (0.0, 0.0):
  698. break # sig_prof handler stopped this itimer
  699. else: # Issue 8424
  700. self.skipTest("timeout: likely cause: machine too slow or load too "
  701. "high")
  702. # profiling itimer should be (0.0, 0.0) now
  703. self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
  704. # and the handler should have been called
  705. self.assertEqual(self.hndl_called, True)
  706. def test_setitimer_tiny(self):
  707. # bpo-30807: C setitimer() takes a microsecond-resolution interval.
  708. # Check that float -> timeval conversion doesn't round
  709. # the interval down to zero, which would disable the timer.
  710. self.itimer = signal.ITIMER_REAL
  711. signal.setitimer(self.itimer, 1e-6)
  712. time.sleep(1)
  713. self.assertEqual(self.hndl_called, True)
  714. class PendingSignalsTests(unittest.TestCase):
  715. """
  716. Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
  717. functions.
  718. """
  719. @unittest.skipUnless(hasattr(signal, 'sigpending'),
  720. 'need signal.sigpending()')
  721. def test_sigpending_empty(self):
  722. self.assertEqual(signal.sigpending(), set())
  723. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  724. 'need signal.pthread_sigmask()')
  725. @unittest.skipUnless(hasattr(signal, 'sigpending'),
  726. 'need signal.sigpending()')
  727. def test_sigpending(self):
  728. code = """if 1:
  729. import os
  730. import signal
  731. def handler(signum, frame):
  732. 1/0
  733. signum = signal.SIGUSR1
  734. signal.signal(signum, handler)
  735. signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
  736. os.kill(os.getpid(), signum)
  737. pending = signal.sigpending()
  738. for sig in pending:
  739. assert isinstance(sig, signal.Signals), repr(pending)
  740. if pending != {signum}:
  741. raise Exception('%s != {%s}' % (pending, signum))
  742. try:
  743. signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
  744. except ZeroDivisionError:
  745. pass
  746. else:
  747. raise Exception("ZeroDivisionError not raised")
  748. """
  749. assert_python_ok('-c', code)
  750. @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
  751. 'need signal.pthread_kill()')
  752. @threading_helper.requires_working_threading()
  753. def test_pthread_kill(self):
  754. code = """if 1:
  755. import signal
  756. import threading
  757. import sys
  758. signum = signal.SIGUSR1
  759. def handler(signum, frame):
  760. 1/0
  761. signal.signal(signum, handler)
  762. tid = threading.get_ident()
  763. try:
  764. signal.pthread_kill(tid, signum)
  765. except ZeroDivisionError:
  766. pass
  767. else:
  768. raise Exception("ZeroDivisionError not raised")
  769. """
  770. assert_python_ok('-c', code)
  771. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  772. 'need signal.pthread_sigmask()')
  773. def wait_helper(self, blocked, test):
  774. """
  775. test: body of the "def test(signum):" function.
  776. blocked: number of the blocked signal
  777. """
  778. code = '''if 1:
  779. import signal
  780. import sys
  781. from signal import Signals
  782. def handler(signum, frame):
  783. 1/0
  784. %s
  785. blocked = %s
  786. signum = signal.SIGALRM
  787. # child: block and wait the signal
  788. try:
  789. signal.signal(signum, handler)
  790. signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
  791. # Do the tests
  792. test(signum)
  793. # The handler must not be called on unblock
  794. try:
  795. signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
  796. except ZeroDivisionError:
  797. print("the signal handler has been called",
  798. file=sys.stderr)
  799. sys.exit(1)
  800. except BaseException as err:
  801. print("error: {}".format(err), file=sys.stderr)
  802. sys.stderr.flush()
  803. sys.exit(1)
  804. ''' % (test.strip(), blocked)
  805. # sig*wait* must be called with the signal blocked: since the current
  806. # process might have several threads running, use a subprocess to have
  807. # a single thread.
  808. assert_python_ok('-c', code)
  809. @unittest.skipUnless(hasattr(signal, 'sigwait'),
  810. 'need signal.sigwait()')
  811. def test_sigwait(self):
  812. self.wait_helper(signal.SIGALRM, '''
  813. def test(signum):
  814. signal.alarm(1)
  815. received = signal.sigwait([signum])
  816. assert isinstance(received, signal.Signals), received
  817. if received != signum:
  818. raise Exception('received %s, not %s' % (received, signum))
  819. ''')
  820. @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
  821. 'need signal.sigwaitinfo()')
  822. def test_sigwaitinfo(self):
  823. self.wait_helper(signal.SIGALRM, '''
  824. def test(signum):
  825. signal.alarm(1)
  826. info = signal.sigwaitinfo([signum])
  827. if info.si_signo != signum:
  828. raise Exception("info.si_signo != %s" % signum)
  829. ''')
  830. @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
  831. 'need signal.sigtimedwait()')
  832. def test_sigtimedwait(self):
  833. self.wait_helper(signal.SIGALRM, '''
  834. def test(signum):
  835. signal.alarm(1)
  836. info = signal.sigtimedwait([signum], 10.1000)
  837. if info.si_signo != signum:
  838. raise Exception('info.si_signo != %s' % signum)
  839. ''')
  840. @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
  841. 'need signal.sigtimedwait()')
  842. def test_sigtimedwait_poll(self):
  843. # check that polling with sigtimedwait works
  844. self.wait_helper(signal.SIGALRM, '''
  845. def test(signum):
  846. import os
  847. os.kill(os.getpid(), signum)
  848. info = signal.sigtimedwait([signum], 0)
  849. if info.si_signo != signum:
  850. raise Exception('info.si_signo != %s' % signum)
  851. ''')
  852. @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
  853. 'need signal.sigtimedwait()')
  854. def test_sigtimedwait_timeout(self):
  855. self.wait_helper(signal.SIGALRM, '''
  856. def test(signum):
  857. received = signal.sigtimedwait([signum], 1.0)
  858. if received is not None:
  859. raise Exception("received=%r" % (received,))
  860. ''')
  861. @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
  862. 'need signal.sigtimedwait()')
  863. def test_sigtimedwait_negative_timeout(self):
  864. signum = signal.SIGALRM
  865. self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
  866. @unittest.skipUnless(hasattr(signal, 'sigwait'),
  867. 'need signal.sigwait()')
  868. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  869. 'need signal.pthread_sigmask()')
  870. @threading_helper.requires_working_threading()
  871. def test_sigwait_thread(self):
  872. # Check that calling sigwait() from a thread doesn't suspend the whole
  873. # process. A new interpreter is spawned to avoid problems when mixing
  874. # threads and fork(): only async-safe functions are allowed between
  875. # fork() and exec().
  876. assert_python_ok("-c", """if True:
  877. import os, threading, sys, time, signal
  878. # the default handler terminates the process
  879. signum = signal.SIGUSR1
  880. def kill_later():
  881. # wait until the main thread is waiting in sigwait()
  882. time.sleep(1)
  883. os.kill(os.getpid(), signum)
  884. # the signal must be blocked by all the threads
  885. signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
  886. killer = threading.Thread(target=kill_later)
  887. killer.start()
  888. received = signal.sigwait([signum])
  889. if received != signum:
  890. print("sigwait() received %s, not %s" % (received, signum),
  891. file=sys.stderr)
  892. sys.exit(1)
  893. killer.join()
  894. # unblock the signal, which should have been cleared by sigwait()
  895. signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
  896. """)
  897. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  898. 'need signal.pthread_sigmask()')
  899. def test_pthread_sigmask_arguments(self):
  900. self.assertRaises(TypeError, signal.pthread_sigmask)
  901. self.assertRaises(TypeError, signal.pthread_sigmask, 1)
  902. self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
  903. self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
  904. with self.assertRaises(ValueError):
  905. signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
  906. with self.assertRaises(ValueError):
  907. signal.pthread_sigmask(signal.SIG_BLOCK, [0])
  908. with self.assertRaises(ValueError):
  909. signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
  910. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  911. 'need signal.pthread_sigmask()')
  912. def test_pthread_sigmask_valid_signals(self):
  913. s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
  914. self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
  915. # Get current blocked set
  916. s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
  917. self.assertLessEqual(s, signal.valid_signals())
  918. @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
  919. 'need signal.pthread_sigmask()')
  920. @threading_helper.requires_working_threading()
  921. def test_pthread_sigmask(self):
  922. code = """if 1:
  923. import signal
  924. import os; import threading
  925. def handler(signum, frame):
  926. 1/0
  927. def kill(signum):
  928. os.kill(os.getpid(), signum)
  929. def check_mask(mask):
  930. for sig in mask:
  931. assert isinstance(sig, signal.Signals), repr(sig)
  932. def read_sigmask():
  933. sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
  934. check_mask(sigmask)
  935. return sigmask
  936. signum = signal.SIGUSR1
  937. # Install our signal handler
  938. old_handler = signal.signal(signum, handler)
  939. # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
  940. old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
  941. check_mask(old_mask)
  942. try:
  943. kill(signum)
  944. except ZeroDivisionError:
  945. pass
  946. else:
  947. raise Exception("ZeroDivisionError not raised")
  948. # Block and then raise SIGUSR1. The signal is blocked: the signal
  949. # handler is not called, and the signal is now pending
  950. mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
  951. check_mask(mask)
  952. kill(signum)
  953. # Check the new mask
  954. blocked = read_sigmask()
  955. check_mask(blocked)
  956. if signum not in blocked:
  957. raise Exception("%s not in %s" % (signum, blocked))
  958. if old_mask ^ blocked != {signum}:
  959. raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
  960. # Unblock SIGUSR1
  961. try:
  962. # unblock the pending signal calls immediately the signal handler
  963. signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
  964. except ZeroDivisionError:
  965. pass
  966. else:
  967. raise Exception("ZeroDivisionError not raised")
  968. try:
  969. kill(signum)
  970. except ZeroDivisionError:
  971. pass
  972. else:
  973. raise Exception("ZeroDivisionError not raised")
  974. # Check the new mask
  975. unblocked = read_sigmask()
  976. if signum in unblocked:
  977. raise Exception("%s in %s" % (signum, unblocked))
  978. if blocked ^ unblocked != {signum}:
  979. raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
  980. if old_mask != unblocked:
  981. raise Exception("%s != %s" % (old_mask, unblocked))
  982. """
  983. assert_python_ok('-c', code)
  984. @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
  985. 'need signal.pthread_kill()')
  986. @threading_helper.requires_working_threading()
  987. def test_pthread_kill_main_thread(self):
  988. # Test that a signal can be sent to the main thread with pthread_kill()
  989. # before any other thread has been created (see issue #12392).
  990. code = """if True:
  991. import threading
  992. import signal
  993. import sys
  994. def handler(signum, frame):
  995. sys.exit(3)
  996. signal.signal(signal.SIGUSR1, handler)
  997. signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
  998. sys.exit(2)
  999. """
  1000. with spawn_python('-c', code) as process:
  1001. stdout, stderr = process.communicate()
  1002. exitcode = process.wait()
  1003. if exitcode != 3:
  1004. raise Exception("Child error (exit code %s): %s" %
  1005. (exitcode, stdout))
  1006. class StressTest(unittest.TestCase):
  1007. """
  1008. Stress signal delivery, especially when a signal arrives in
  1009. the middle of recomputing the signal state or executing
  1010. previously tripped signal handlers.
  1011. """
  1012. def setsig(self, signum, handler):
  1013. old_handler = signal.signal(signum, handler)
  1014. self.addCleanup(signal.signal, signum, old_handler)
  1015. def measure_itimer_resolution(self):
  1016. N = 20
  1017. times = []
  1018. def handler(signum=None, frame=None):
  1019. if len(times) < N:
  1020. times.append(time.perf_counter())
  1021. # 1 µs is the smallest possible timer interval,
  1022. # we want to measure what the concrete duration
  1023. # will be on this platform
  1024. signal.setitimer(signal.ITIMER_REAL, 1e-6)
  1025. self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
  1026. self.setsig(signal.SIGALRM, handler)
  1027. handler()
  1028. while len(times) < N:
  1029. time.sleep(1e-3)
  1030. durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
  1031. med = statistics.median(durations)
  1032. if support.verbose:
  1033. print("detected median itimer() resolution: %.6f s." % (med,))
  1034. return med
  1035. def decide_itimer_count(self):
  1036. # Some systems have poor setitimer() resolution (for example
  1037. # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
  1038. # number of sequential timers based on that.
  1039. reso = self.measure_itimer_resolution()
  1040. if reso <= 1e-4:
  1041. return 10000
  1042. elif reso <= 1e-2:
  1043. return 100
  1044. else:
  1045. self.skipTest("detected itimer resolution (%.3f s.) too high "
  1046. "(> 10 ms.) on this platform (or system too busy)"
  1047. % (reso,))
  1048. @unittest.skipUnless(hasattr(signal, "setitimer"),
  1049. "test needs setitimer()")
  1050. def test_stress_delivery_dependent(self):
  1051. """
  1052. This test uses dependent signal handlers.
  1053. """
  1054. N = self.decide_itimer_count()
  1055. sigs = []
  1056. def first_handler(signum, frame):
  1057. # 1e-6 is the minimum non-zero value for `setitimer()`.
  1058. # Choose a random delay so as to improve chances of
  1059. # triggering a race condition. Ideally the signal is received
  1060. # when inside critical signal-handling routines such as
  1061. # Py_MakePendingCalls().
  1062. signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
  1063. def second_handler(signum=None, frame=None):
  1064. sigs.append(signum)
  1065. # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both
  1066. # ascending and descending sequences (SIGUSR1 then SIGALRM,
  1067. # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
  1068. self.setsig(signal.SIGPROF, first_handler)
  1069. self.setsig(signal.SIGUSR1, first_handler)
  1070. self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL
  1071. expected_sigs = 0
  1072. deadline = time.monotonic() + support.SHORT_TIMEOUT
  1073. while expected_sigs < N:
  1074. os.kill(os.getpid(), signal.SIGPROF)
  1075. expected_sigs += 1
  1076. # Wait for handlers to run to avoid signal coalescing
  1077. while len(sigs) < expected_sigs and time.monotonic() < deadline:
  1078. time.sleep(1e-5)
  1079. os.kill(os.getpid(), signal.SIGUSR1)
  1080. expected_sigs += 1
  1081. while len(sigs) < expected_sigs and time.monotonic() < deadline:
  1082. time.sleep(1e-5)
  1083. # All ITIMER_REAL signals should have been delivered to the
  1084. # Python handler
  1085. self.assertEqual(len(sigs), N, "Some signals were lost")
  1086. @unittest.skipUnless(hasattr(signal, "setitimer"),
  1087. "test needs setitimer()")
  1088. def test_stress_delivery_simultaneous(self):
  1089. """
  1090. This test uses simultaneous signal handlers.
  1091. """
  1092. N = self.decide_itimer_count()
  1093. sigs = []
  1094. def handler(signum, frame):
  1095. sigs.append(signum)
  1096. self.setsig(signal.SIGUSR1, handler)
  1097. self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
  1098. expected_sigs = 0
  1099. deadline = time.monotonic() + support.SHORT_TIMEOUT
  1100. while expected_sigs < N:
  1101. # Hopefully the SIGALRM will be received somewhere during
  1102. # initial processing of SIGUSR1.
  1103. signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
  1104. os.kill(os.getpid(), signal.SIGUSR1)
  1105. expected_sigs += 2
  1106. # Wait for handlers to run to avoid signal coalescing
  1107. while len(sigs) < expected_sigs and time.monotonic() < deadline:
  1108. time.sleep(1e-5)
  1109. # All ITIMER_REAL signals should have been delivered to the
  1110. # Python handler
  1111. self.assertEqual(len(sigs), N, "Some signals were lost")
  1112. @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
  1113. "test needs SIGUSR1")
  1114. @threading_helper.requires_working_threading()
  1115. def test_stress_modifying_handlers(self):
  1116. # bpo-43406: race condition between trip_signal() and signal.signal
  1117. signum = signal.SIGUSR1
  1118. num_sent_signals = 0
  1119. num_received_signals = 0
  1120. do_stop = False
  1121. def custom_handler(signum, frame):
  1122. nonlocal num_received_signals
  1123. num_received_signals += 1
  1124. def set_interrupts():
  1125. nonlocal num_sent_signals
  1126. while not do_stop:
  1127. signal.raise_signal(signum)
  1128. num_sent_signals += 1
  1129. def cycle_handlers():
  1130. while num_sent_signals < 100:
  1131. for i in range(20000):
  1132. # Cycle between a Python-defined and a non-Python handler
  1133. for handler in [custom_handler, signal.SIG_IGN]:
  1134. signal.signal(signum, handler)
  1135. old_handler = signal.signal(signum, custom_handler)
  1136. self.addCleanup(signal.signal, signum, old_handler)
  1137. t = threading.Thread(target=set_interrupts)
  1138. try:
  1139. ignored = False
  1140. with support.catch_unraisable_exception() as cm:
  1141. t.start()
  1142. cycle_handlers()
  1143. do_stop = True
  1144. t.join()
  1145. if cm.unraisable is not None:
  1146. # An unraisable exception may be printed out when
  1147. # a signal is ignored due to the aforementioned
  1148. # race condition, check it.
  1149. self.assertIsInstance(cm.unraisable.exc_value, OSError)
  1150. self.assertIn(
  1151. f"Signal {signum:d} ignored due to race condition",
  1152. str(cm.unraisable.exc_value))
  1153. ignored = True
  1154. # bpo-43406: Even if it is unlikely, it's technically possible that
  1155. # all signals were ignored because of race conditions.
  1156. if not ignored:
  1157. # Sanity check that some signals were received, but not all
  1158. self.assertGreater(num_received_signals, 0)
  1159. self.assertLess(num_received_signals, num_sent_signals)
  1160. finally:
  1161. do_stop = True
  1162. t.join()
  1163. class RaiseSignalTest(unittest.TestCase):
  1164. def test_sigint(self):
  1165. with self.assertRaises(KeyboardInterrupt):
  1166. signal.raise_signal(signal.SIGINT)
  1167. @unittest.skipIf(sys.platform != "win32", "Windows specific test")
  1168. def test_invalid_argument(self):
  1169. try:
  1170. SIGHUP = 1 # not supported on win32
  1171. signal.raise_signal(SIGHUP)
  1172. self.fail("OSError (Invalid argument) expected")
  1173. except OSError as e:
  1174. if e.errno == errno.EINVAL:
  1175. pass
  1176. else:
  1177. raise
  1178. def test_handler(self):
  1179. is_ok = False
  1180. def handler(a, b):
  1181. nonlocal is_ok
  1182. is_ok = True
  1183. old_signal = signal.signal(signal.SIGINT, handler)
  1184. self.addCleanup(signal.signal, signal.SIGINT, old_signal)
  1185. signal.raise_signal(signal.SIGINT)
  1186. self.assertTrue(is_ok)
  1187. class PidfdSignalTest(unittest.TestCase):
  1188. @unittest.skipUnless(
  1189. hasattr(signal, "pidfd_send_signal"),
  1190. "pidfd support not built in",
  1191. )
  1192. def test_pidfd_send_signal(self):
  1193. with self.assertRaises(OSError) as cm:
  1194. signal.pidfd_send_signal(0, signal.SIGINT)
  1195. if cm.exception.errno == errno.ENOSYS:
  1196. self.skipTest("kernel does not support pidfds")
  1197. elif cm.exception.errno == errno.EPERM:
  1198. self.skipTest("Not enough privileges to use pidfs")
  1199. self.assertEqual(cm.exception.errno, errno.EBADF)
  1200. my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
  1201. self.addCleanup(os.close, my_pidfd)
  1202. with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
  1203. signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
  1204. with self.assertRaises(KeyboardInterrupt):
  1205. signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
  1206. def tearDownModule():
  1207. support.reap_children()
  1208. if __name__ == "__main__":
  1209. unittest.main()