| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442 |
- import enum
- import errno
- import inspect
- import os
- import random
- import signal
- import socket
- import statistics
- import subprocess
- import sys
- import threading
- import time
- import unittest
- from test import support
- from test.support import os_helper
- from test.support.script_helper import assert_python_ok, spawn_python
- from test.support import threading_helper
- try:
- import _testcapi
- except ImportError:
- _testcapi = None
- class GenericTests(unittest.TestCase):
- def test_enums(self):
- for name in dir(signal):
- sig = getattr(signal, name)
- if name in {'SIG_DFL', 'SIG_IGN'}:
- self.assertIsInstance(sig, signal.Handlers)
- elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
- self.assertIsInstance(sig, signal.Sigmasks)
- elif name.startswith('SIG') and not name.startswith('SIG_'):
- self.assertIsInstance(sig, signal.Signals)
- elif name.startswith('CTRL_'):
- self.assertIsInstance(sig, signal.Signals)
- self.assertEqual(sys.platform, "win32")
- CheckedSignals = enum._old_convert_(
- enum.IntEnum, 'Signals', 'signal',
- lambda name:
- name.isupper()
- and (name.startswith('SIG') and not name.startswith('SIG_'))
- or name.startswith('CTRL_'),
- source=signal,
- )
- enum._test_simple_enum(CheckedSignals, signal.Signals)
- CheckedHandlers = enum._old_convert_(
- enum.IntEnum, 'Handlers', 'signal',
- lambda name: name in ('SIG_DFL', 'SIG_IGN'),
- source=signal,
- )
- enum._test_simple_enum(CheckedHandlers, signal.Handlers)
- Sigmasks = getattr(signal, 'Sigmasks', None)
- if Sigmasks is not None:
- CheckedSigmasks = enum._old_convert_(
- enum.IntEnum, 'Sigmasks', 'signal',
- lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
- source=signal,
- )
- enum._test_simple_enum(CheckedSigmasks, Sigmasks)
- def test_functions_module_attr(self):
- # Issue #27718: If __all__ is not defined all non-builtin functions
- # should have correct __module__ to be displayed by pydoc.
- for name in dir(signal):
- value = getattr(signal, name)
- if inspect.isroutine(value) and not inspect.isbuiltin(value):
- self.assertEqual(value.__module__, 'signal')
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class PosixTests(unittest.TestCase):
- def trivial_signal_handler(self, *args):
- pass
- def test_out_of_range_signal_number_raises_error(self):
- self.assertRaises(ValueError, signal.getsignal, 4242)
- self.assertRaises(ValueError, signal.signal, 4242,
- self.trivial_signal_handler)
- self.assertRaises(ValueError, signal.strsignal, 4242)
- def test_setting_signal_handler_to_none_raises_error(self):
- self.assertRaises(TypeError, signal.signal,
- signal.SIGUSR1, None)
- def test_getsignal(self):
- hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
- self.assertIsInstance(hup, signal.Handlers)
- self.assertEqual(signal.getsignal(signal.SIGHUP),
- self.trivial_signal_handler)
- signal.signal(signal.SIGHUP, hup)
- self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
- def test_strsignal(self):
- self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
- self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
- self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
- # Issue 3864, unknown if this affects earlier versions of freebsd also
- def test_interprocess_signal(self):
- dirname = os.path.dirname(__file__)
- script = os.path.join(dirname, 'signalinterproctester.py')
- assert_python_ok(script)
- @unittest.skipUnless(
- hasattr(signal, "valid_signals"),
- "requires signal.valid_signals"
- )
- def test_valid_signals(self):
- s = signal.valid_signals()
- self.assertIsInstance(s, set)
- self.assertIn(signal.Signals.SIGINT, s)
- self.assertIn(signal.Signals.SIGALRM, s)
- self.assertNotIn(0, s)
- self.assertNotIn(signal.NSIG, s)
- self.assertLess(len(s), signal.NSIG)
- # gh-91145: Make sure that all SIGxxx constants exposed by the Python
- # signal module have a number in the [0; signal.NSIG-1] range.
- for name in dir(signal):
- if not name.startswith("SIG"):
- continue
- if name in {"SIG_IGN", "SIG_DFL"}:
- # SIG_IGN and SIG_DFL are pointers
- continue
- with self.subTest(name=name):
- signum = getattr(signal, name)
- self.assertGreaterEqual(signum, 0)
- self.assertLess(signum, signal.NSIG)
- @unittest.skipUnless(sys.executable, "sys.executable required.")
- @support.requires_subprocess()
- def test_keyboard_interrupt_exit_code(self):
- """KeyboardInterrupt triggers exit via SIGINT."""
- process = subprocess.run(
- [sys.executable, "-c",
- "import os, signal, time\n"
- "os.kill(os.getpid(), signal.SIGINT)\n"
- "for _ in range(999): time.sleep(0.01)"],
- stderr=subprocess.PIPE)
- self.assertIn(b"KeyboardInterrupt", process.stderr)
- self.assertEqual(process.returncode, -signal.SIGINT)
- # Caveat: The exit code is insufficient to guarantee we actually died
- # via a signal. POSIX shells do more than look at the 8 bit value.
- # Writing an automation friendly test of an interactive shell
- # to confirm that our process died via a SIGINT proved too complex.
- @unittest.skipUnless(sys.platform == "win32", "Windows specific")
- class WindowsSignalTests(unittest.TestCase):
- def test_valid_signals(self):
- s = signal.valid_signals()
- self.assertIsInstance(s, set)
- self.assertGreaterEqual(len(s), 6)
- self.assertIn(signal.Signals.SIGINT, s)
- self.assertNotIn(0, s)
- self.assertNotIn(signal.NSIG, s)
- self.assertLess(len(s), signal.NSIG)
- def test_issue9324(self):
- # Updated for issue #10003, adding SIGBREAK
- handler = lambda x, y: None
- checked = set()
- for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
- signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
- signal.SIGTERM):
- # Set and then reset a handler for signals that work on windows.
- # Issue #18396, only for signals without a C-level handler.
- if signal.getsignal(sig) is not None:
- signal.signal(sig, signal.signal(sig, handler))
- checked.add(sig)
- # Issue #18396: Ensure the above loop at least tested *something*
- self.assertTrue(checked)
- with self.assertRaises(ValueError):
- signal.signal(-1, handler)
- with self.assertRaises(ValueError):
- signal.signal(7, handler)
- @unittest.skipUnless(sys.executable, "sys.executable required.")
- @support.requires_subprocess()
- def test_keyboard_interrupt_exit_code(self):
- """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
- # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
- # as that requires setting up a console control handler in a child
- # in its own process group. Doable, but quite complicated. (see
- # @eryksun on https://github.com/python/cpython/pull/11862)
- process = subprocess.run(
- [sys.executable, "-c", "raise KeyboardInterrupt"],
- stderr=subprocess.PIPE)
- self.assertIn(b"KeyboardInterrupt", process.stderr)
- STATUS_CONTROL_C_EXIT = 0xC000013A
- self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
- class WakeupFDTests(unittest.TestCase):
- def test_invalid_call(self):
- # First parameter is positional-only
- with self.assertRaises(TypeError):
- signal.set_wakeup_fd(signum=signal.SIGINT)
- # warn_on_full_buffer is a keyword-only parameter
- with self.assertRaises(TypeError):
- signal.set_wakeup_fd(signal.SIGINT, False)
- def test_invalid_fd(self):
- fd = os_helper.make_bad_fd()
- self.assertRaises((ValueError, OSError),
- signal.set_wakeup_fd, fd)
- @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
- def test_invalid_socket(self):
- sock = socket.socket()
- fd = sock.fileno()
- sock.close()
- self.assertRaises((ValueError, OSError),
- signal.set_wakeup_fd, fd)
- # Emscripten does not support fstat on pipes yet.
- # https://github.com/emscripten-core/emscripten/issues/16414
- @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- def test_set_wakeup_fd_result(self):
- r1, w1 = os.pipe()
- self.addCleanup(os.close, r1)
- self.addCleanup(os.close, w1)
- r2, w2 = os.pipe()
- self.addCleanup(os.close, r2)
- self.addCleanup(os.close, w2)
- if hasattr(os, 'set_blocking'):
- os.set_blocking(w1, False)
- os.set_blocking(w2, False)
- signal.set_wakeup_fd(w1)
- self.assertEqual(signal.set_wakeup_fd(w2), w1)
- self.assertEqual(signal.set_wakeup_fd(-1), w2)
- self.assertEqual(signal.set_wakeup_fd(-1), -1)
- @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
- @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
- def test_set_wakeup_fd_socket_result(self):
- sock1 = socket.socket()
- self.addCleanup(sock1.close)
- sock1.setblocking(False)
- fd1 = sock1.fileno()
- sock2 = socket.socket()
- self.addCleanup(sock2.close)
- sock2.setblocking(False)
- fd2 = sock2.fileno()
- signal.set_wakeup_fd(fd1)
- self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
- self.assertEqual(signal.set_wakeup_fd(-1), fd2)
- self.assertEqual(signal.set_wakeup_fd(-1), -1)
- # On Windows, files are always blocking and Windows does not provide a
- # function to test if a socket is in non-blocking mode.
- @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
- @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- def test_set_wakeup_fd_blocking(self):
- rfd, wfd = os.pipe()
- self.addCleanup(os.close, rfd)
- self.addCleanup(os.close, wfd)
- # fd must be non-blocking
- os.set_blocking(wfd, True)
- with self.assertRaises(ValueError) as cm:
- signal.set_wakeup_fd(wfd)
- self.assertEqual(str(cm.exception),
- "the fd %s must be in non-blocking mode" % wfd)
- # non-blocking is ok
- os.set_blocking(wfd, False)
- signal.set_wakeup_fd(wfd)
- signal.set_wakeup_fd(-1)
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class WakeupSignalTests(unittest.TestCase):
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- def check_wakeup(self, test_body, *signals, ordered=True):
- # use a subprocess to have only one thread
- code = """if 1:
- import _testcapi
- import os
- import signal
- import struct
- signals = {!r}
- def handler(signum, frame):
- pass
- def check_signum(signals):
- data = os.read(read, len(signals)+1)
- raised = struct.unpack('%uB' % len(data), data)
- if not {!r}:
- raised = set(raised)
- signals = set(signals)
- if raised != signals:
- raise Exception("%r != %r" % (raised, signals))
- {}
- signal.signal(signal.SIGALRM, handler)
- read, write = os.pipe()
- os.set_blocking(write, False)
- signal.set_wakeup_fd(write)
- test()
- check_signum(signals)
- os.close(read)
- os.close(write)
- """.format(tuple(map(int, signals)), ordered, test_body)
- assert_python_ok('-c', code)
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- def test_wakeup_write_error(self):
- # Issue #16105: write() errors in the C signal handler should not
- # pass silently.
- # Use a subprocess to have only one thread.
- code = """if 1:
- import _testcapi
- import errno
- import os
- import signal
- import sys
- from test.support import captured_stderr
- def handler(signum, frame):
- 1/0
- signal.signal(signal.SIGALRM, handler)
- r, w = os.pipe()
- os.set_blocking(r, False)
- # Set wakeup_fd a read-only file descriptor to trigger the error
- signal.set_wakeup_fd(r)
- try:
- with captured_stderr() as err:
- signal.raise_signal(signal.SIGALRM)
- except ZeroDivisionError:
- # An ignored exception should have been printed out on stderr
- err = err.getvalue()
- if ('Exception ignored when trying to write to the signal wakeup fd'
- not in err):
- raise AssertionError(err)
- if ('OSError: [Errno %d]' % errno.EBADF) not in err:
- raise AssertionError(err)
- else:
- raise AssertionError("ZeroDivisionError not raised")
- os.close(r)
- os.close(w)
- """
- r, w = os.pipe()
- try:
- os.write(r, b'x')
- except OSError:
- pass
- else:
- self.skipTest("OS doesn't report write() error on the read end of a pipe")
- finally:
- os.close(r)
- os.close(w)
- assert_python_ok('-c', code)
- def test_wakeup_fd_early(self):
- self.check_wakeup("""def test():
- import select
- import time
- TIMEOUT_FULL = 10
- TIMEOUT_HALF = 5
- class InterruptSelect(Exception):
- pass
- def handler(signum, frame):
- raise InterruptSelect
- signal.signal(signal.SIGALRM, handler)
- signal.alarm(1)
- # We attempt to get a signal during the sleep,
- # before select is called
- try:
- select.select([], [], [], TIMEOUT_FULL)
- except InterruptSelect:
- pass
- else:
- raise Exception("select() was not interrupted")
- before_time = time.monotonic()
- select.select([read], [], [], TIMEOUT_FULL)
- after_time = time.monotonic()
- dt = after_time - before_time
- if dt >= TIMEOUT_HALF:
- raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
- """, signal.SIGALRM)
- def test_wakeup_fd_during(self):
- self.check_wakeup("""def test():
- import select
- import time
- TIMEOUT_FULL = 10
- TIMEOUT_HALF = 5
- class InterruptSelect(Exception):
- pass
- def handler(signum, frame):
- raise InterruptSelect
- signal.signal(signal.SIGALRM, handler)
- signal.alarm(1)
- before_time = time.monotonic()
- # We attempt to get a signal during the select call
- try:
- select.select([read], [], [], TIMEOUT_FULL)
- except InterruptSelect:
- pass
- else:
- raise Exception("select() was not interrupted")
- after_time = time.monotonic()
- dt = after_time - before_time
- if dt >= TIMEOUT_HALF:
- raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
- """, signal.SIGALRM)
- def test_signum(self):
- self.check_wakeup("""def test():
- signal.signal(signal.SIGUSR1, handler)
- signal.raise_signal(signal.SIGUSR1)
- signal.raise_signal(signal.SIGALRM)
- """, signal.SIGUSR1, signal.SIGALRM)
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- def test_pending(self):
- self.check_wakeup("""def test():
- signum1 = signal.SIGUSR1
- signum2 = signal.SIGUSR2
- signal.signal(signum1, handler)
- signal.signal(signum2, handler)
- signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
- signal.raise_signal(signum1)
- signal.raise_signal(signum2)
- # Unblocking the 2 signals calls the C signal handler twice
- signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
- """, signal.SIGUSR1, signal.SIGUSR2, ordered=False)
- @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
- class WakeupSocketSignalTests(unittest.TestCase):
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- def test_socket(self):
- # use a subprocess to have only one thread
- code = """if 1:
- import signal
- import socket
- import struct
- import _testcapi
- signum = signal.SIGINT
- signals = (signum,)
- def handler(signum, frame):
- pass
- signal.signal(signum, handler)
- read, write = socket.socketpair()
- write.setblocking(False)
- signal.set_wakeup_fd(write.fileno())
- signal.raise_signal(signum)
- data = read.recv(1)
- if not data:
- raise Exception("no signum written")
- raised = struct.unpack('B', data)
- if raised != signals:
- raise Exception("%r != %r" % (raised, signals))
- read.close()
- write.close()
- """
- assert_python_ok('-c', code)
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- def test_send_error(self):
- # Use a subprocess to have only one thread.
- if os.name == 'nt':
- action = 'send'
- else:
- action = 'write'
- code = """if 1:
- import errno
- import signal
- import socket
- import sys
- import time
- import _testcapi
- from test.support import captured_stderr
- signum = signal.SIGINT
- def handler(signum, frame):
- pass
- signal.signal(signum, handler)
- read, write = socket.socketpair()
- read.setblocking(False)
- write.setblocking(False)
- signal.set_wakeup_fd(write.fileno())
- # Close sockets: send() will fail
- read.close()
- write.close()
- with captured_stderr() as err:
- signal.raise_signal(signum)
- err = err.getvalue()
- if ('Exception ignored when trying to {action} to the signal wakeup fd'
- not in err):
- raise AssertionError(err)
- """.format(action=action)
- assert_python_ok('-c', code)
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- def test_warn_on_full_buffer(self):
- # Use a subprocess to have only one thread.
- if os.name == 'nt':
- action = 'send'
- else:
- action = 'write'
- code = """if 1:
- import errno
- import signal
- import socket
- import sys
- import time
- import _testcapi
- from test.support import captured_stderr
- signum = signal.SIGINT
- # This handler will be called, but we intentionally won't read from
- # the wakeup fd.
- def handler(signum, frame):
- pass
- signal.signal(signum, handler)
- read, write = socket.socketpair()
- # Fill the socketpair buffer
- if sys.platform == 'win32':
- # bpo-34130: On Windows, sometimes non-blocking send fails to fill
- # the full socketpair buffer, so use a timeout of 50 ms instead.
- write.settimeout(0.050)
- else:
- write.setblocking(False)
- written = 0
- if sys.platform == "vxworks":
- CHUNK_SIZES = (1,)
- else:
- # Start with large chunk size to reduce the
- # number of send needed to fill the buffer.
- CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
- for chunk_size in CHUNK_SIZES:
- chunk = b"x" * chunk_size
- try:
- while True:
- write.send(chunk)
- written += chunk_size
- except (BlockingIOError, TimeoutError):
- pass
- print(f"%s bytes written into the socketpair" % written, flush=True)
- write.setblocking(False)
- try:
- write.send(b"x")
- except BlockingIOError:
- # The socketpair buffer seems full
- pass
- else:
- raise AssertionError("%s bytes failed to fill the socketpair "
- "buffer" % written)
- # By default, we get a warning when a signal arrives
- msg = ('Exception ignored when trying to {action} '
- 'to the signal wakeup fd')
- signal.set_wakeup_fd(write.fileno())
- with captured_stderr() as err:
- signal.raise_signal(signum)
- err = err.getvalue()
- if msg not in err:
- raise AssertionError("first set_wakeup_fd() test failed, "
- "stderr: %r" % err)
- # And also if warn_on_full_buffer=True
- signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
- with captured_stderr() as err:
- signal.raise_signal(signum)
- err = err.getvalue()
- if msg not in err:
- raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
- "test failed, stderr: %r" % err)
- # But not if warn_on_full_buffer=False
- signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
- with captured_stderr() as err:
- signal.raise_signal(signum)
- err = err.getvalue()
- if err != "":
- raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
- "test failed, stderr: %r" % err)
- # And then check the default again, to make sure warn_on_full_buffer
- # settings don't leak across calls.
- signal.set_wakeup_fd(write.fileno())
- with captured_stderr() as err:
- signal.raise_signal(signum)
- err = err.getvalue()
- if msg not in err:
- raise AssertionError("second set_wakeup_fd() test failed, "
- "stderr: %r" % err)
- """.format(action=action)
- assert_python_ok('-c', code)
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- @unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
- @support.requires_subprocess()
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- class SiginterruptTest(unittest.TestCase):
- def readpipe_interrupted(self, interrupt):
- """Perform a read during which a signal will arrive. Return True if the
- read is interrupted by the signal and raises an exception. Return False
- if it returns normally.
- """
- # use a subprocess to have only one thread, to have a timeout on the
- # blocking read and to not touch signal handling in this process
- code = """if 1:
- import errno
- import os
- import signal
- import sys
- interrupt = %r
- r, w = os.pipe()
- def handler(signum, frame):
- 1 / 0
- signal.signal(signal.SIGALRM, handler)
- if interrupt is not None:
- signal.siginterrupt(signal.SIGALRM, interrupt)
- print("ready")
- sys.stdout.flush()
- # run the test twice
- try:
- for loop in range(2):
- # send a SIGALRM in a second (during the read)
- signal.alarm(1)
- try:
- # blocking call: read from a pipe without data
- os.read(r, 1)
- except ZeroDivisionError:
- pass
- else:
- sys.exit(2)
- sys.exit(3)
- finally:
- os.close(r)
- os.close(w)
- """ % (interrupt,)
- with spawn_python('-c', code) as process:
- try:
- # wait until the child process is loaded and has started
- first_line = process.stdout.readline()
- stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
- except subprocess.TimeoutExpired:
- process.kill()
- return False
- else:
- stdout = first_line + stdout
- exitcode = process.wait()
- if exitcode not in (2, 3):
- raise Exception("Child error (exit code %s): %r"
- % (exitcode, stdout))
- return (exitcode == 3)
- def test_without_siginterrupt(self):
- # If a signal handler is installed and siginterrupt is not called
- # at all, when that signal arrives, it interrupts a syscall that's in
- # progress.
- interrupted = self.readpipe_interrupted(None)
- self.assertTrue(interrupted)
- def test_siginterrupt_on(self):
- # If a signal handler is installed and siginterrupt is called with
- # a true value for the second argument, when that signal arrives, it
- # interrupts a syscall that's in progress.
- interrupted = self.readpipe_interrupted(True)
- self.assertTrue(interrupted)
- def test_siginterrupt_off(self):
- # If a signal handler is installed and siginterrupt is called with
- # a false value for the second argument, when that signal arrives, it
- # does not interrupt a syscall that's in progress.
- interrupted = self.readpipe_interrupted(False)
- self.assertFalse(interrupted)
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- @unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
- "needs signal.getitimer() and signal.setitimer()")
- class ItimerTest(unittest.TestCase):
- def setUp(self):
- self.hndl_called = False
- self.hndl_count = 0
- self.itimer = None
- self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
- def tearDown(self):
- signal.signal(signal.SIGALRM, self.old_alarm)
- if self.itimer is not None: # test_itimer_exc doesn't change this attr
- # just ensure that itimer is stopped
- signal.setitimer(self.itimer, 0)
- def sig_alrm(self, *args):
- self.hndl_called = True
- def sig_vtalrm(self, *args):
- self.hndl_called = True
- if self.hndl_count > 3:
- # it shouldn't be here, because it should have been disabled.
- raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
- "timer.")
- elif self.hndl_count == 3:
- # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
- signal.setitimer(signal.ITIMER_VIRTUAL, 0)
- self.hndl_count += 1
- def sig_prof(self, *args):
- self.hndl_called = True
- signal.setitimer(signal.ITIMER_PROF, 0)
- def test_itimer_exc(self):
- # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
- # defines it ?
- self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
- # Negative times are treated as zero on some platforms.
- if 0:
- self.assertRaises(signal.ItimerError,
- signal.setitimer, signal.ITIMER_REAL, -1)
- def test_itimer_real(self):
- self.itimer = signal.ITIMER_REAL
- signal.setitimer(self.itimer, 1.0)
- signal.pause()
- self.assertEqual(self.hndl_called, True)
- # Issue 3864, unknown if this affects earlier versions of freebsd also
- @unittest.skipIf(sys.platform in ('netbsd5',),
- 'itimer not reliable (does not mix well with threading) on some BSDs.')
- def test_itimer_virtual(self):
- self.itimer = signal.ITIMER_VIRTUAL
- signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
- signal.setitimer(self.itimer, 0.3, 0.2)
- start_time = time.monotonic()
- while time.monotonic() - start_time < 60.0:
- # use up some virtual time by doing real work
- _ = pow(12345, 67890, 10000019)
- if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_vtalrm handler stopped this itimer
- else: # Issue 8424
- self.skipTest("timeout: likely cause: machine too slow or load too "
- "high")
- # virtual itimer should be (0.0, 0.0) now
- self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
- # and the handler should have been called
- self.assertEqual(self.hndl_called, True)
- def test_itimer_prof(self):
- self.itimer = signal.ITIMER_PROF
- signal.signal(signal.SIGPROF, self.sig_prof)
- signal.setitimer(self.itimer, 0.2, 0.2)
- start_time = time.monotonic()
- while time.monotonic() - start_time < 60.0:
- # do some work
- _ = pow(12345, 67890, 10000019)
- if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_prof handler stopped this itimer
- else: # Issue 8424
- self.skipTest("timeout: likely cause: machine too slow or load too "
- "high")
- # profiling itimer should be (0.0, 0.0) now
- self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
- # and the handler should have been called
- self.assertEqual(self.hndl_called, True)
- def test_setitimer_tiny(self):
- # bpo-30807: C setitimer() takes a microsecond-resolution interval.
- # Check that float -> timeval conversion doesn't round
- # the interval down to zero, which would disable the timer.
- self.itimer = signal.ITIMER_REAL
- signal.setitimer(self.itimer, 1e-6)
- time.sleep(1)
- self.assertEqual(self.hndl_called, True)
- class PendingSignalsTests(unittest.TestCase):
- """
- Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
- functions.
- """
- @unittest.skipUnless(hasattr(signal, 'sigpending'),
- 'need signal.sigpending()')
- def test_sigpending_empty(self):
- self.assertEqual(signal.sigpending(), set())
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- @unittest.skipUnless(hasattr(signal, 'sigpending'),
- 'need signal.sigpending()')
- def test_sigpending(self):
- code = """if 1:
- import os
- import signal
- def handler(signum, frame):
- 1/0
- signum = signal.SIGUSR1
- signal.signal(signum, handler)
- signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- os.kill(os.getpid(), signum)
- pending = signal.sigpending()
- for sig in pending:
- assert isinstance(sig, signal.Signals), repr(pending)
- if pending != {signum}:
- raise Exception('%s != {%s}' % (pending, signum))
- try:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- """
- assert_python_ok('-c', code)
- @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
- 'need signal.pthread_kill()')
- @threading_helper.requires_working_threading()
- def test_pthread_kill(self):
- code = """if 1:
- import signal
- import threading
- import sys
- signum = signal.SIGUSR1
- def handler(signum, frame):
- 1/0
- signal.signal(signum, handler)
- tid = threading.get_ident()
- try:
- signal.pthread_kill(tid, signum)
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- """
- assert_python_ok('-c', code)
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- def wait_helper(self, blocked, test):
- """
- test: body of the "def test(signum):" function.
- blocked: number of the blocked signal
- """
- code = '''if 1:
- import signal
- import sys
- from signal import Signals
- def handler(signum, frame):
- 1/0
- %s
- blocked = %s
- signum = signal.SIGALRM
- # child: block and wait the signal
- try:
- signal.signal(signum, handler)
- signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
- # Do the tests
- test(signum)
- # The handler must not be called on unblock
- try:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
- except ZeroDivisionError:
- print("the signal handler has been called",
- file=sys.stderr)
- sys.exit(1)
- except BaseException as err:
- print("error: {}".format(err), file=sys.stderr)
- sys.stderr.flush()
- sys.exit(1)
- ''' % (test.strip(), blocked)
- # sig*wait* must be called with the signal blocked: since the current
- # process might have several threads running, use a subprocess to have
- # a single thread.
- assert_python_ok('-c', code)
- @unittest.skipUnless(hasattr(signal, 'sigwait'),
- 'need signal.sigwait()')
- def test_sigwait(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- signal.alarm(1)
- received = signal.sigwait([signum])
- assert isinstance(received, signal.Signals), received
- if received != signum:
- raise Exception('received %s, not %s' % (received, signum))
- ''')
- @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
- 'need signal.sigwaitinfo()')
- def test_sigwaitinfo(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- signal.alarm(1)
- info = signal.sigwaitinfo([signum])
- if info.si_signo != signum:
- raise Exception("info.si_signo != %s" % signum)
- ''')
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- signal.alarm(1)
- info = signal.sigtimedwait([signum], 10.1000)
- if info.si_signo != signum:
- raise Exception('info.si_signo != %s' % signum)
- ''')
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait_poll(self):
- # check that polling with sigtimedwait works
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- import os
- os.kill(os.getpid(), signum)
- info = signal.sigtimedwait([signum], 0)
- if info.si_signo != signum:
- raise Exception('info.si_signo != %s' % signum)
- ''')
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait_timeout(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- received = signal.sigtimedwait([signum], 1.0)
- if received is not None:
- raise Exception("received=%r" % (received,))
- ''')
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait_negative_timeout(self):
- signum = signal.SIGALRM
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
- @unittest.skipUnless(hasattr(signal, 'sigwait'),
- 'need signal.sigwait()')
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- @threading_helper.requires_working_threading()
- def test_sigwait_thread(self):
- # Check that calling sigwait() from a thread doesn't suspend the whole
- # process. A new interpreter is spawned to avoid problems when mixing
- # threads and fork(): only async-safe functions are allowed between
- # fork() and exec().
- assert_python_ok("-c", """if True:
- import os, threading, sys, time, signal
- # the default handler terminates the process
- signum = signal.SIGUSR1
- def kill_later():
- # wait until the main thread is waiting in sigwait()
- time.sleep(1)
- os.kill(os.getpid(), signum)
- # the signal must be blocked by all the threads
- signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- killer = threading.Thread(target=kill_later)
- killer.start()
- received = signal.sigwait([signum])
- if received != signum:
- print("sigwait() received %s, not %s" % (received, signum),
- file=sys.stderr)
- sys.exit(1)
- killer.join()
- # unblock the signal, which should have been cleared by sigwait()
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- """)
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- def test_pthread_sigmask_arguments(self):
- self.assertRaises(TypeError, signal.pthread_sigmask)
- self.assertRaises(TypeError, signal.pthread_sigmask, 1)
- self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
- self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
- with self.assertRaises(ValueError):
- signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
- with self.assertRaises(ValueError):
- signal.pthread_sigmask(signal.SIG_BLOCK, [0])
- with self.assertRaises(ValueError):
- signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- def test_pthread_sigmask_valid_signals(self):
- s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
- self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
- # Get current blocked set
- s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
- self.assertLessEqual(s, signal.valid_signals())
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- @threading_helper.requires_working_threading()
- def test_pthread_sigmask(self):
- code = """if 1:
- import signal
- import os; import threading
- def handler(signum, frame):
- 1/0
- def kill(signum):
- os.kill(os.getpid(), signum)
- def check_mask(mask):
- for sig in mask:
- assert isinstance(sig, signal.Signals), repr(sig)
- def read_sigmask():
- sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
- check_mask(sigmask)
- return sigmask
- signum = signal.SIGUSR1
- # Install our signal handler
- old_handler = signal.signal(signum, handler)
- # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
- old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- check_mask(old_mask)
- try:
- kill(signum)
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- # Block and then raise SIGUSR1. The signal is blocked: the signal
- # handler is not called, and the signal is now pending
- mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- check_mask(mask)
- kill(signum)
- # Check the new mask
- blocked = read_sigmask()
- check_mask(blocked)
- if signum not in blocked:
- raise Exception("%s not in %s" % (signum, blocked))
- if old_mask ^ blocked != {signum}:
- raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
- # Unblock SIGUSR1
- try:
- # unblock the pending signal calls immediately the signal handler
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- try:
- kill(signum)
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- # Check the new mask
- unblocked = read_sigmask()
- if signum in unblocked:
- raise Exception("%s in %s" % (signum, unblocked))
- if blocked ^ unblocked != {signum}:
- raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
- if old_mask != unblocked:
- raise Exception("%s != %s" % (old_mask, unblocked))
- """
- assert_python_ok('-c', code)
- @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
- 'need signal.pthread_kill()')
- @threading_helper.requires_working_threading()
- def test_pthread_kill_main_thread(self):
- # Test that a signal can be sent to the main thread with pthread_kill()
- # before any other thread has been created (see issue #12392).
- code = """if True:
- import threading
- import signal
- import sys
- def handler(signum, frame):
- sys.exit(3)
- signal.signal(signal.SIGUSR1, handler)
- signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
- sys.exit(2)
- """
- with spawn_python('-c', code) as process:
- stdout, stderr = process.communicate()
- exitcode = process.wait()
- if exitcode != 3:
- raise Exception("Child error (exit code %s): %s" %
- (exitcode, stdout))
- class StressTest(unittest.TestCase):
- """
- Stress signal delivery, especially when a signal arrives in
- the middle of recomputing the signal state or executing
- previously tripped signal handlers.
- """
- def setsig(self, signum, handler):
- old_handler = signal.signal(signum, handler)
- self.addCleanup(signal.signal, signum, old_handler)
- def measure_itimer_resolution(self):
- N = 20
- times = []
- def handler(signum=None, frame=None):
- if len(times) < N:
- times.append(time.perf_counter())
- # 1 µs is the smallest possible timer interval,
- # we want to measure what the concrete duration
- # will be on this platform
- signal.setitimer(signal.ITIMER_REAL, 1e-6)
- self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
- self.setsig(signal.SIGALRM, handler)
- handler()
- while len(times) < N:
- time.sleep(1e-3)
- durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
- med = statistics.median(durations)
- if support.verbose:
- print("detected median itimer() resolution: %.6f s." % (med,))
- return med
- def decide_itimer_count(self):
- # Some systems have poor setitimer() resolution (for example
- # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
- # number of sequential timers based on that.
- reso = self.measure_itimer_resolution()
- if reso <= 1e-4:
- return 10000
- elif reso <= 1e-2:
- return 100
- else:
- self.skipTest("detected itimer resolution (%.3f s.) too high "
- "(> 10 ms.) on this platform (or system too busy)"
- % (reso,))
- @unittest.skipUnless(hasattr(signal, "setitimer"),
- "test needs setitimer()")
- def test_stress_delivery_dependent(self):
- """
- This test uses dependent signal handlers.
- """
- N = self.decide_itimer_count()
- sigs = []
- def first_handler(signum, frame):
- # 1e-6 is the minimum non-zero value for `setitimer()`.
- # Choose a random delay so as to improve chances of
- # triggering a race condition. Ideally the signal is received
- # when inside critical signal-handling routines such as
- # Py_MakePendingCalls().
- signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
- def second_handler(signum=None, frame=None):
- sigs.append(signum)
- # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both
- # ascending and descending sequences (SIGUSR1 then SIGALRM,
- # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
- self.setsig(signal.SIGPROF, first_handler)
- self.setsig(signal.SIGUSR1, first_handler)
- self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL
- expected_sigs = 0
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while expected_sigs < N:
- os.kill(os.getpid(), signal.SIGPROF)
- expected_sigs += 1
- # Wait for handlers to run to avoid signal coalescing
- while len(sigs) < expected_sigs and time.monotonic() < deadline:
- time.sleep(1e-5)
- os.kill(os.getpid(), signal.SIGUSR1)
- expected_sigs += 1
- while len(sigs) < expected_sigs and time.monotonic() < deadline:
- time.sleep(1e-5)
- # All ITIMER_REAL signals should have been delivered to the
- # Python handler
- self.assertEqual(len(sigs), N, "Some signals were lost")
- @unittest.skipUnless(hasattr(signal, "setitimer"),
- "test needs setitimer()")
- def test_stress_delivery_simultaneous(self):
- """
- This test uses simultaneous signal handlers.
- """
- N = self.decide_itimer_count()
- sigs = []
- def handler(signum, frame):
- sigs.append(signum)
- self.setsig(signal.SIGUSR1, handler)
- self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
- expected_sigs = 0
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while expected_sigs < N:
- # Hopefully the SIGALRM will be received somewhere during
- # initial processing of SIGUSR1.
- signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
- os.kill(os.getpid(), signal.SIGUSR1)
- expected_sigs += 2
- # Wait for handlers to run to avoid signal coalescing
- while len(sigs) < expected_sigs and time.monotonic() < deadline:
- time.sleep(1e-5)
- # All ITIMER_REAL signals should have been delivered to the
- # Python handler
- self.assertEqual(len(sigs), N, "Some signals were lost")
- @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
- "test needs SIGUSR1")
- @threading_helper.requires_working_threading()
- def test_stress_modifying_handlers(self):
- # bpo-43406: race condition between trip_signal() and signal.signal
- signum = signal.SIGUSR1
- num_sent_signals = 0
- num_received_signals = 0
- do_stop = False
- def custom_handler(signum, frame):
- nonlocal num_received_signals
- num_received_signals += 1
- def set_interrupts():
- nonlocal num_sent_signals
- while not do_stop:
- signal.raise_signal(signum)
- num_sent_signals += 1
- def cycle_handlers():
- while num_sent_signals < 100:
- for i in range(20000):
- # Cycle between a Python-defined and a non-Python handler
- for handler in [custom_handler, signal.SIG_IGN]:
- signal.signal(signum, handler)
- old_handler = signal.signal(signum, custom_handler)
- self.addCleanup(signal.signal, signum, old_handler)
- t = threading.Thread(target=set_interrupts)
- try:
- ignored = False
- with support.catch_unraisable_exception() as cm:
- t.start()
- cycle_handlers()
- do_stop = True
- t.join()
- if cm.unraisable is not None:
- # An unraisable exception may be printed out when
- # a signal is ignored due to the aforementioned
- # race condition, check it.
- self.assertIsInstance(cm.unraisable.exc_value, OSError)
- self.assertIn(
- f"Signal {signum:d} ignored due to race condition",
- str(cm.unraisable.exc_value))
- ignored = True
- # bpo-43406: Even if it is unlikely, it's technically possible that
- # all signals were ignored because of race conditions.
- if not ignored:
- # Sanity check that some signals were received, but not all
- self.assertGreater(num_received_signals, 0)
- self.assertLess(num_received_signals, num_sent_signals)
- finally:
- do_stop = True
- t.join()
- class RaiseSignalTest(unittest.TestCase):
- def test_sigint(self):
- with self.assertRaises(KeyboardInterrupt):
- signal.raise_signal(signal.SIGINT)
- @unittest.skipIf(sys.platform != "win32", "Windows specific test")
- def test_invalid_argument(self):
- try:
- SIGHUP = 1 # not supported on win32
- signal.raise_signal(SIGHUP)
- self.fail("OSError (Invalid argument) expected")
- except OSError as e:
- if e.errno == errno.EINVAL:
- pass
- else:
- raise
- def test_handler(self):
- is_ok = False
- def handler(a, b):
- nonlocal is_ok
- is_ok = True
- old_signal = signal.signal(signal.SIGINT, handler)
- self.addCleanup(signal.signal, signal.SIGINT, old_signal)
- signal.raise_signal(signal.SIGINT)
- self.assertTrue(is_ok)
- class PidfdSignalTest(unittest.TestCase):
- @unittest.skipUnless(
- hasattr(signal, "pidfd_send_signal"),
- "pidfd support not built in",
- )
- def test_pidfd_send_signal(self):
- with self.assertRaises(OSError) as cm:
- signal.pidfd_send_signal(0, signal.SIGINT)
- if cm.exception.errno == errno.ENOSYS:
- self.skipTest("kernel does not support pidfds")
- elif cm.exception.errno == errno.EPERM:
- self.skipTest("Not enough privileges to use pidfs")
- self.assertEqual(cm.exception.errno, errno.EBADF)
- my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
- self.addCleanup(os.close, my_pidfd)
- with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
- signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
- with self.assertRaises(KeyboardInterrupt):
- signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
- def tearDownModule():
- support.reap_children()
- if __name__ == "__main__":
- unittest.main()
|