| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- """
- This test suite exercises some system calls subject to interruption with EINTR,
- to check that it is actually handled transparently.
- It is intended to be run by the main test suite within a child process, to
- ensure there is no background thread running (so that signals are delivered to
- the correct thread).
- Signals are generated in-process using setitimer(ITIMER_REAL), which allows
- sub-second periodicity (contrarily to signal()).
- """
- import contextlib
- import faulthandler
- import fcntl
- import os
- import platform
- import select
- import signal
- import socket
- import subprocess
- import sys
- import time
- import unittest
- from test import support
- from test.support import os_helper
- from test.support import socket_helper
- @contextlib.contextmanager
- def kill_on_error(proc):
- """Context manager killing the subprocess if a Python exception is raised."""
- with proc:
- try:
- yield proc
- except:
- proc.kill()
- raise
- @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
- class EINTRBaseTest(unittest.TestCase):
- """ Base class for EINTR tests. """
- # delay for initial signal delivery
- signal_delay = 0.1
- # signal delivery periodicity
- signal_period = 0.1
- # default sleep time for tests - should obviously have:
- # sleep_time > signal_period
- sleep_time = 0.2
- def sighandler(self, signum, frame):
- self.signals += 1
- def setUp(self):
- self.signals = 0
- self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
- signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
- self.signal_period)
- # Use faulthandler as watchdog to debug when a test hangs
- # (timeout of 10 minutes)
- faulthandler.dump_traceback_later(10 * 60, exit=True,
- file=sys.__stderr__)
- @staticmethod
- def stop_alarm():
- signal.setitimer(signal.ITIMER_REAL, 0, 0)
- def tearDown(self):
- self.stop_alarm()
- signal.signal(signal.SIGALRM, self.orig_handler)
- faulthandler.cancel_dump_traceback_later()
- def subprocess(self, *args, **kw):
- cmd_args = (sys.executable, '-c') + args
- return subprocess.Popen(cmd_args, **kw)
- @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
- class OSEINTRTest(EINTRBaseTest):
- """ EINTR tests for the os module. """
- def new_sleep_process(self):
- code = 'import time; time.sleep(%r)' % self.sleep_time
- return self.subprocess(code)
- def _test_wait_multiple(self, wait_func):
- num = 3
- processes = [self.new_sleep_process() for _ in range(num)]
- for _ in range(num):
- wait_func()
- # Call the Popen method to avoid a ResourceWarning
- for proc in processes:
- proc.wait()
- def test_wait(self):
- self._test_wait_multiple(os.wait)
- @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
- def test_wait3(self):
- self._test_wait_multiple(lambda: os.wait3(0))
- def _test_wait_single(self, wait_func):
- proc = self.new_sleep_process()
- wait_func(proc.pid)
- # Call the Popen method to avoid a ResourceWarning
- proc.wait()
- def test_waitpid(self):
- self._test_wait_single(lambda pid: os.waitpid(pid, 0))
- @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
- def test_wait4(self):
- self._test_wait_single(lambda pid: os.wait4(pid, 0))
- def test_read(self):
- rd, wr = os.pipe()
- self.addCleanup(os.close, rd)
- # wr closed explicitly by parent
- # the payload below are smaller than PIPE_BUF, hence the writes will be
- # atomic
- datas = [b"hello", b"world", b"spam"]
- code = '\n'.join((
- 'import os, sys, time',
- '',
- 'wr = int(sys.argv[1])',
- 'datas = %r' % datas,
- 'sleep_time = %r' % self.sleep_time,
- '',
- 'for data in datas:',
- ' # let the parent block on read()',
- ' time.sleep(sleep_time)',
- ' os.write(wr, data)',
- ))
- proc = self.subprocess(code, str(wr), pass_fds=[wr])
- with kill_on_error(proc):
- os.close(wr)
- for data in datas:
- self.assertEqual(data, os.read(rd, len(data)))
- self.assertEqual(proc.wait(), 0)
- def test_write(self):
- rd, wr = os.pipe()
- self.addCleanup(os.close, wr)
- # rd closed explicitly by parent
- # we must write enough data for the write() to block
- data = b"x" * support.PIPE_MAX_SIZE
- code = '\n'.join((
- 'import io, os, sys, time',
- '',
- 'rd = int(sys.argv[1])',
- 'sleep_time = %r' % self.sleep_time,
- 'data = b"x" * %s' % support.PIPE_MAX_SIZE,
- 'data_len = len(data)',
- '',
- '# let the parent block on write()',
- 'time.sleep(sleep_time)',
- '',
- 'read_data = io.BytesIO()',
- 'while len(read_data.getvalue()) < data_len:',
- ' chunk = os.read(rd, 2 * data_len)',
- ' read_data.write(chunk)',
- '',
- 'value = read_data.getvalue()',
- 'if value != data:',
- ' raise Exception("read error: %s vs %s bytes"',
- ' % (len(value), data_len))',
- ))
- proc = self.subprocess(code, str(rd), pass_fds=[rd])
- with kill_on_error(proc):
- os.close(rd)
- written = 0
- while written < len(data):
- written += os.write(wr, memoryview(data)[written:])
- self.assertEqual(proc.wait(), 0)
- @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
- class SocketEINTRTest(EINTRBaseTest):
- """ EINTR tests for the socket module. """
- @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
- def _test_recv(self, recv_func):
- rd, wr = socket.socketpair()
- self.addCleanup(rd.close)
- # wr closed explicitly by parent
- # single-byte payload guard us against partial recv
- datas = [b"x", b"y", b"z"]
- code = '\n'.join((
- 'import os, socket, sys, time',
- '',
- 'fd = int(sys.argv[1])',
- 'family = %s' % int(wr.family),
- 'sock_type = %s' % int(wr.type),
- 'datas = %r' % datas,
- 'sleep_time = %r' % self.sleep_time,
- '',
- 'wr = socket.fromfd(fd, family, sock_type)',
- 'os.close(fd)',
- '',
- 'with wr:',
- ' for data in datas:',
- ' # let the parent block on recv()',
- ' time.sleep(sleep_time)',
- ' wr.sendall(data)',
- ))
- fd = wr.fileno()
- proc = self.subprocess(code, str(fd), pass_fds=[fd])
- with kill_on_error(proc):
- wr.close()
- for data in datas:
- self.assertEqual(data, recv_func(rd, len(data)))
- self.assertEqual(proc.wait(), 0)
- def test_recv(self):
- self._test_recv(socket.socket.recv)
- @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
- def test_recvmsg(self):
- self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
- def _test_send(self, send_func):
- rd, wr = socket.socketpair()
- self.addCleanup(wr.close)
- # rd closed explicitly by parent
- # we must send enough data for the send() to block
- data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
- code = '\n'.join((
- 'import os, socket, sys, time',
- '',
- 'fd = int(sys.argv[1])',
- 'family = %s' % int(rd.family),
- 'sock_type = %s' % int(rd.type),
- 'sleep_time = %r' % self.sleep_time,
- 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
- 'data_len = len(data)',
- '',
- 'rd = socket.fromfd(fd, family, sock_type)',
- 'os.close(fd)',
- '',
- 'with rd:',
- ' # let the parent block on send()',
- ' time.sleep(sleep_time)',
- '',
- ' received_data = bytearray(data_len)',
- ' n = 0',
- ' while n < data_len:',
- ' n += rd.recv_into(memoryview(received_data)[n:])',
- '',
- 'if received_data != data:',
- ' raise Exception("recv error: %s vs %s bytes"',
- ' % (len(received_data), data_len))',
- ))
- fd = rd.fileno()
- proc = self.subprocess(code, str(fd), pass_fds=[fd])
- with kill_on_error(proc):
- rd.close()
- written = 0
- while written < len(data):
- sent = send_func(wr, memoryview(data)[written:])
- # sendall() returns None
- written += len(data) if sent is None else sent
- self.assertEqual(proc.wait(), 0)
- def test_send(self):
- self._test_send(socket.socket.send)
- def test_sendall(self):
- self._test_send(socket.socket.sendall)
- @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
- def test_sendmsg(self):
- self._test_send(lambda sock, data: sock.sendmsg([data]))
- def test_accept(self):
- sock = socket.create_server((socket_helper.HOST, 0))
- self.addCleanup(sock.close)
- port = sock.getsockname()[1]
- code = '\n'.join((
- 'import socket, time',
- '',
- 'host = %r' % socket_helper.HOST,
- 'port = %s' % port,
- 'sleep_time = %r' % self.sleep_time,
- '',
- '# let parent block on accept()',
- 'time.sleep(sleep_time)',
- 'with socket.create_connection((host, port)):',
- ' time.sleep(sleep_time)',
- ))
- proc = self.subprocess(code)
- with kill_on_error(proc):
- client_sock, _ = sock.accept()
- client_sock.close()
- self.assertEqual(proc.wait(), 0)
- # Issue #25122: There is a race condition in the FreeBSD kernel on
- # handling signals in the FIFO device. Skip the test until the bug is
- # fixed in the kernel.
- # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
- @support.requires_freebsd_version(10, 3)
- @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
- def _test_open(self, do_open_close_reader, do_open_close_writer):
- filename = os_helper.TESTFN
- # Use a fifo: until the child opens it for reading, the parent will
- # block when trying to open it for writing.
- os_helper.unlink(filename)
- try:
- os.mkfifo(filename)
- except PermissionError as e:
- self.skipTest('os.mkfifo(): %s' % e)
- self.addCleanup(os_helper.unlink, filename)
- code = '\n'.join((
- 'import os, time',
- '',
- 'path = %a' % filename,
- 'sleep_time = %r' % self.sleep_time,
- '',
- '# let the parent block',
- 'time.sleep(sleep_time)',
- '',
- do_open_close_reader,
- ))
- proc = self.subprocess(code)
- with kill_on_error(proc):
- do_open_close_writer(filename)
- self.assertEqual(proc.wait(), 0)
- def python_open(self, path):
- fp = open(path, 'w')
- fp.close()
- @unittest.skipIf(sys.platform == "darwin",
- "hangs under macOS; see bpo-25234, bpo-35363")
- def test_open(self):
- self._test_open("fp = open(path, 'r')\nfp.close()",
- self.python_open)
- def os_open(self, path):
- fd = os.open(path, os.O_WRONLY)
- os.close(fd)
- @unittest.skipIf(sys.platform == "darwin",
- "hangs under macOS; see bpo-25234, bpo-35363")
- def test_os_open(self):
- self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
- self.os_open)
- @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
- class TimeEINTRTest(EINTRBaseTest):
- """ EINTR tests for the time module. """
- def test_sleep(self):
- t0 = time.monotonic()
- time.sleep(self.sleep_time)
- self.stop_alarm()
- dt = time.monotonic() - t0
- self.assertGreaterEqual(dt, self.sleep_time)
- @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
- # bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
- # is vulnerable to a race condition between the child and the parent processes.
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- class SignalEINTRTest(EINTRBaseTest):
- """ EINTR tests for the signal module. """
- def check_sigwait(self, wait_func):
- signum = signal.SIGUSR1
- pid = os.getpid()
- old_handler = signal.signal(signum, lambda *args: None)
- self.addCleanup(signal.signal, signum, old_handler)
- code = '\n'.join((
- 'import os, time',
- 'pid = %s' % os.getpid(),
- 'signum = %s' % int(signum),
- 'sleep_time = %r' % self.sleep_time,
- 'time.sleep(sleep_time)',
- 'os.kill(pid, signum)',
- ))
- old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
- t0 = time.monotonic()
- proc = self.subprocess(code)
- with kill_on_error(proc):
- wait_func(signum)
- dt = time.monotonic() - t0
- self.assertEqual(proc.wait(), 0)
- @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
- 'need signal.sigwaitinfo()')
- def test_sigwaitinfo(self):
- def wait_func(signum):
- signal.sigwaitinfo([signum])
- self.check_sigwait(wait_func)
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigwaitinfo()')
- def test_sigtimedwait(self):
- def wait_func(signum):
- signal.sigtimedwait([signum], 120.0)
- self.check_sigwait(wait_func)
- @unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
- class SelectEINTRTest(EINTRBaseTest):
- """ EINTR tests for the select module. """
- def test_select(self):
- t0 = time.monotonic()
- select.select([], [], [], self.sleep_time)
- dt = time.monotonic() - t0
- self.stop_alarm()
- self.assertGreaterEqual(dt, self.sleep_time)
- @unittest.skipIf(sys.platform == "darwin",
- "poll may fail on macOS; see issue #28087")
- @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
- def test_poll(self):
- poller = select.poll()
- t0 = time.monotonic()
- poller.poll(self.sleep_time * 1e3)
- dt = time.monotonic() - t0
- self.stop_alarm()
- self.assertGreaterEqual(dt, self.sleep_time)
- @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
- def test_epoll(self):
- poller = select.epoll()
- self.addCleanup(poller.close)
- t0 = time.monotonic()
- poller.poll(self.sleep_time)
- dt = time.monotonic() - t0
- self.stop_alarm()
- self.assertGreaterEqual(dt, self.sleep_time)
- @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
- def test_kqueue(self):
- kqueue = select.kqueue()
- self.addCleanup(kqueue.close)
- t0 = time.monotonic()
- kqueue.control(None, 1, self.sleep_time)
- dt = time.monotonic() - t0
- self.stop_alarm()
- self.assertGreaterEqual(dt, self.sleep_time)
- @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
- def test_devpoll(self):
- poller = select.devpoll()
- self.addCleanup(poller.close)
- t0 = time.monotonic()
- poller.poll(self.sleep_time * 1e3)
- dt = time.monotonic() - t0
- self.stop_alarm()
- self.assertGreaterEqual(dt, self.sleep_time)
- class FNTLEINTRTest(EINTRBaseTest):
- def _lock(self, lock_func, lock_name):
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- code = '\n'.join((
- "import fcntl, time",
- "with open('%s', 'wb') as f:" % os_helper.TESTFN,
- " fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
- " time.sleep(%s)" % self.sleep_time))
- start_time = time.monotonic()
- proc = self.subprocess(code)
- with kill_on_error(proc):
- with open(os_helper.TESTFN, 'wb') as f:
- while True: # synchronize the subprocess
- dt = time.monotonic() - start_time
- if dt > 60.0:
- raise Exception("failed to sync child in %.1f sec" % dt)
- try:
- lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- lock_func(f, fcntl.LOCK_UN)
- time.sleep(0.01)
- except BlockingIOError:
- break
- # the child locked the file just a moment ago for 'sleep_time' seconds
- # that means that the lock below will block for 'sleep_time' minus some
- # potential context switch delay
- lock_func(f, fcntl.LOCK_EX)
- dt = time.monotonic() - start_time
- self.assertGreaterEqual(dt, self.sleep_time)
- self.stop_alarm()
- proc.wait()
- # Issue 35633: See https://bugs.python.org/issue35633#msg333662
- # skip test rather than accept PermissionError from all platforms
- @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
- def test_lockf(self):
- self._lock(fcntl.lockf, "lockf")
- def test_flock(self):
- self._lock(fcntl.flock, "flock")
- if __name__ == "__main__":
- unittest.main()
|