test_pty.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. from test.support import verbose, reap_children
  2. from test.support.import_helper import import_module
  3. # Skip these tests if termios or fcntl are not available
  4. import_module('termios')
  5. import_module("fcntl")
  6. import errno
  7. import os
  8. import pty
  9. import tty
  10. import sys
  11. import select
  12. import signal
  13. import socket
  14. import io # readline
  15. import unittest
  16. import struct
  17. import fcntl
  18. import warnings
  19. TEST_STRING_1 = b"I wish to buy a fish license.\n"
  20. TEST_STRING_2 = b"For my pet fish, Eric.\n"
  21. try:
  22. _TIOCGWINSZ = tty.TIOCGWINSZ
  23. _TIOCSWINSZ = tty.TIOCSWINSZ
  24. _HAVE_WINSZ = True
  25. except AttributeError:
  26. _HAVE_WINSZ = False
  27. if verbose:
  28. def debug(msg):
  29. print(msg)
  30. else:
  31. def debug(msg):
  32. pass
  33. # Note that os.read() is nondeterministic so we need to be very careful
  34. # to make the test suite deterministic. A normal call to os.read() may
  35. # give us less than expected.
  36. #
  37. # Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get
  38. # back 'foo\r\n' at the other end. The behavior depends on the termios
  39. # setting. The newline translation may be OS-specific. To make the
  40. # test suite deterministic and OS-independent, the functions _readline
  41. # and normalize_output can be used.
  42. def normalize_output(data):
  43. # Some operating systems do conversions on newline. We could possibly fix
  44. # that by doing the appropriate termios.tcsetattr()s. I couldn't figure out
  45. # the right combo on Tru64. So, just normalize the output and doc the
  46. # problem O/Ses by allowing certain combinations for some platforms, but
  47. # avoid allowing other differences (like extra whitespace, trailing garbage,
  48. # etc.)
  49. # This is about the best we can do without getting some feedback
  50. # from someone more knowledgable.
  51. # OSF/1 (Tru64) apparently turns \n into \r\r\n.
  52. if data.endswith(b'\r\r\n'):
  53. return data.replace(b'\r\r\n', b'\n')
  54. if data.endswith(b'\r\n'):
  55. return data.replace(b'\r\n', b'\n')
  56. return data
  57. def _readline(fd):
  58. """Read one line. May block forever if no newline is read."""
  59. reader = io.FileIO(fd, mode='rb', closefd=False)
  60. return reader.readline()
  61. def expectedFailureIfStdinIsTTY(fun):
  62. # avoid isatty()
  63. try:
  64. tty.tcgetattr(pty.STDIN_FILENO)
  65. return unittest.expectedFailure(fun)
  66. except tty.error:
  67. pass
  68. return fun
  69. def _get_term_winsz(fd):
  70. s = struct.pack("HHHH", 0, 0, 0, 0)
  71. return fcntl.ioctl(fd, _TIOCGWINSZ, s)
  72. def _set_term_winsz(fd, winsz):
  73. fcntl.ioctl(fd, _TIOCSWINSZ, winsz)
  74. # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
  75. # because pty code is not too portable.
  76. class PtyTest(unittest.TestCase):
  77. def setUp(self):
  78. old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
  79. self.addCleanup(signal.signal, signal.SIGALRM, old_alarm)
  80. old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup)
  81. self.addCleanup(signal.signal, signal.SIGHUP, old_sighup)
  82. # isatty() and close() can hang on some platforms. Set an alarm
  83. # before running the test to make sure we don't hang forever.
  84. self.addCleanup(signal.alarm, 0)
  85. signal.alarm(10)
  86. # Save original stdin window size
  87. self.stdin_rows = None
  88. self.stdin_cols = None
  89. if _HAVE_WINSZ:
  90. try:
  91. stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
  92. self.stdin_rows = stdin_dim.lines
  93. self.stdin_cols = stdin_dim.columns
  94. old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
  95. self.stdin_cols, 0, 0)
  96. self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
  97. except OSError:
  98. pass
  99. def handle_sig(self, sig, frame):
  100. self.fail("isatty hung")
  101. @staticmethod
  102. def handle_sighup(signum, frame):
  103. pass
  104. @expectedFailureIfStdinIsTTY
  105. def test_openpty(self):
  106. try:
  107. mode = tty.tcgetattr(pty.STDIN_FILENO)
  108. except tty.error:
  109. # not a tty or bad/closed fd
  110. debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
  111. mode = None
  112. new_stdin_winsz = None
  113. if self.stdin_rows is not None and self.stdin_cols is not None:
  114. try:
  115. # Modify pty.STDIN_FILENO window size; we need to
  116. # check if pty.openpty() is able to set pty slave
  117. # window size accordingly.
  118. debug("Setting pty.STDIN_FILENO window size")
  119. debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})")
  120. target_stdin_rows = self.stdin_rows + 1
  121. target_stdin_cols = self.stdin_cols + 1
  122. debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})")
  123. target_stdin_winsz = struct.pack("HHHH", target_stdin_rows,
  124. target_stdin_cols, 0, 0)
  125. _set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)
  126. # Were we able to set the window size
  127. # of pty.STDIN_FILENO successfully?
  128. new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
  129. self.assertEqual(new_stdin_winsz, target_stdin_winsz,
  130. "pty.STDIN_FILENO window size unchanged")
  131. except OSError:
  132. warnings.warn("Failed to set pty.STDIN_FILENO window size")
  133. pass
  134. try:
  135. debug("Calling pty.openpty()")
  136. try:
  137. master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
  138. except TypeError:
  139. master_fd, slave_fd = pty.openpty()
  140. debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
  141. except OSError:
  142. # " An optional feature could not be imported " ... ?
  143. raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
  144. # closing master_fd can raise a SIGHUP if the process is
  145. # the session leader: we installed a SIGHUP signal handler
  146. # to ignore this signal.
  147. self.addCleanup(os.close, master_fd)
  148. self.addCleanup(os.close, slave_fd)
  149. self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")
  150. if mode:
  151. self.assertEqual(tty.tcgetattr(slave_fd), mode,
  152. "openpty() failed to set slave termios")
  153. if new_stdin_winsz:
  154. self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
  155. "openpty() failed to set slave window size")
  156. # Ensure the fd is non-blocking in case there's nothing to read.
  157. blocking = os.get_blocking(master_fd)
  158. try:
  159. os.set_blocking(master_fd, False)
  160. try:
  161. s1 = os.read(master_fd, 1024)
  162. self.assertEqual(b'', s1)
  163. except OSError as e:
  164. if e.errno != errno.EAGAIN:
  165. raise
  166. finally:
  167. # Restore the original flags.
  168. os.set_blocking(master_fd, blocking)
  169. debug("Writing to slave_fd")
  170. os.write(slave_fd, TEST_STRING_1)
  171. s1 = _readline(master_fd)
  172. self.assertEqual(b'I wish to buy a fish license.\n',
  173. normalize_output(s1))
  174. debug("Writing chunked output")
  175. os.write(slave_fd, TEST_STRING_2[:5])
  176. os.write(slave_fd, TEST_STRING_2[5:])
  177. s2 = _readline(master_fd)
  178. self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2))
  179. def test_fork(self):
  180. debug("calling pty.fork()")
  181. pid, master_fd = pty.fork()
  182. self.addCleanup(os.close, master_fd)
  183. if pid == pty.CHILD:
  184. # stdout should be connected to a tty.
  185. if not os.isatty(1):
  186. debug("Child's fd 1 is not a tty?!")
  187. os._exit(3)
  188. # After pty.fork(), the child should already be a session leader.
  189. # (on those systems that have that concept.)
  190. debug("In child, calling os.setsid()")
  191. try:
  192. os.setsid()
  193. except OSError:
  194. # Good, we already were session leader
  195. debug("Good: OSError was raised.")
  196. pass
  197. except AttributeError:
  198. # Have pty, but not setsid()?
  199. debug("No setsid() available?")
  200. pass
  201. except:
  202. # We don't want this error to propagate, escaping the call to
  203. # os._exit() and causing very peculiar behavior in the calling
  204. # regrtest.py !
  205. # Note: could add traceback printing here.
  206. debug("An unexpected error was raised.")
  207. os._exit(1)
  208. else:
  209. debug("os.setsid() succeeded! (bad!)")
  210. os._exit(2)
  211. os._exit(4)
  212. else:
  213. debug("Waiting for child (%d) to finish." % pid)
  214. # In verbose mode, we have to consume the debug output from the
  215. # child or the child will block, causing this test to hang in the
  216. # parent's waitpid() call. The child blocks after a
  217. # platform-dependent amount of data is written to its fd. On
  218. # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
  219. # X even the small writes in the child above will block it. Also
  220. # on Linux, the read() will raise an OSError (input/output error)
  221. # when it tries to read past the end of the buffer but the child's
  222. # already exited, so catch and discard those exceptions. It's not
  223. # worth checking for EIO.
  224. while True:
  225. try:
  226. data = os.read(master_fd, 80)
  227. except OSError:
  228. break
  229. if not data:
  230. break
  231. sys.stdout.write(str(data.replace(b'\r\n', b'\n'),
  232. encoding='ascii'))
  233. ##line = os.read(master_fd, 80)
  234. ##lines = line.replace('\r\n', '\n').split('\n')
  235. ##if False and lines != ['In child, calling os.setsid()',
  236. ## 'Good: OSError was raised.', '']:
  237. ## raise TestFailed("Unexpected output from child: %r" % line)
  238. (pid, status) = os.waitpid(pid, 0)
  239. res = os.waitstatus_to_exitcode(status)
  240. debug("Child (%d) exited with code %d (status %d)." % (pid, res, status))
  241. if res == 1:
  242. self.fail("Child raised an unexpected exception in os.setsid()")
  243. elif res == 2:
  244. self.fail("pty.fork() failed to make child a session leader.")
  245. elif res == 3:
  246. self.fail("Child spawned by pty.fork() did not have a tty as stdout")
  247. elif res != 4:
  248. self.fail("pty.fork() failed for unknown reasons.")
  249. ##debug("Reading from master_fd now that the child has exited")
  250. ##try:
  251. ## s1 = os.read(master_fd, 1024)
  252. ##except OSError:
  253. ## pass
  254. ##else:
  255. ## raise TestFailed("Read from master_fd did not raise exception")
  256. def test_master_read(self):
  257. # XXX(nnorwitz): this test leaks fds when there is an error.
  258. debug("Calling pty.openpty()")
  259. master_fd, slave_fd = pty.openpty()
  260. debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
  261. self.addCleanup(os.close, master_fd)
  262. debug("Closing slave_fd")
  263. os.close(slave_fd)
  264. debug("Reading from master_fd")
  265. try:
  266. data = os.read(master_fd, 1)
  267. except OSError: # Linux
  268. data = b""
  269. self.assertEqual(data, b"")
  270. def test_spawn_doesnt_hang(self):
  271. pty.spawn([sys.executable, '-c', 'print("hi there")'])
  272. class SmallPtyTests(unittest.TestCase):
  273. """These tests don't spawn children or hang."""
  274. def setUp(self):
  275. self.orig_stdin_fileno = pty.STDIN_FILENO
  276. self.orig_stdout_fileno = pty.STDOUT_FILENO
  277. self.orig_pty_close = pty.close
  278. self.orig_pty__copy = pty._copy
  279. self.orig_pty_fork = pty.fork
  280. self.orig_pty_select = pty.select
  281. self.orig_pty_setraw = pty.setraw
  282. self.orig_pty_tcgetattr = pty.tcgetattr
  283. self.orig_pty_tcsetattr = pty.tcsetattr
  284. self.orig_pty_waitpid = pty.waitpid
  285. self.fds = [] # A list of file descriptors to close.
  286. self.files = []
  287. self.select_rfds_lengths = []
  288. self.select_rfds_results = []
  289. self.tcsetattr_mode_setting = None
  290. def tearDown(self):
  291. pty.STDIN_FILENO = self.orig_stdin_fileno
  292. pty.STDOUT_FILENO = self.orig_stdout_fileno
  293. pty.close = self.orig_pty_close
  294. pty._copy = self.orig_pty__copy
  295. pty.fork = self.orig_pty_fork
  296. pty.select = self.orig_pty_select
  297. pty.setraw = self.orig_pty_setraw
  298. pty.tcgetattr = self.orig_pty_tcgetattr
  299. pty.tcsetattr = self.orig_pty_tcsetattr
  300. pty.waitpid = self.orig_pty_waitpid
  301. for file in self.files:
  302. try:
  303. file.close()
  304. except OSError:
  305. pass
  306. for fd in self.fds:
  307. try:
  308. os.close(fd)
  309. except OSError:
  310. pass
  311. def _pipe(self):
  312. pipe_fds = os.pipe()
  313. self.fds.extend(pipe_fds)
  314. return pipe_fds
  315. def _socketpair(self):
  316. socketpair = socket.socketpair()
  317. self.files.extend(socketpair)
  318. return socketpair
  319. def _mock_select(self, rfds, wfds, xfds, timeout=0):
  320. # This will raise IndexError when no more expected calls exist.
  321. # This ignores the timeout
  322. self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
  323. return self.select_rfds_results.pop(0), [], []
  324. def _make_mock_fork(self, pid):
  325. def mock_fork():
  326. return (pid, 12)
  327. return mock_fork
  328. def _mock_tcsetattr(self, fileno, opt, mode):
  329. self.tcsetattr_mode_setting = mode
  330. def test__copy_to_each(self):
  331. """Test the normal data case on both master_fd and stdin."""
  332. read_from_stdout_fd, mock_stdout_fd = self._pipe()
  333. pty.STDOUT_FILENO = mock_stdout_fd
  334. mock_stdin_fd, write_to_stdin_fd = self._pipe()
  335. pty.STDIN_FILENO = mock_stdin_fd
  336. socketpair = self._socketpair()
  337. masters = [s.fileno() for s in socketpair]
  338. # Feed data. Smaller than PIPEBUF. These writes will not block.
  339. os.write(masters[1], b'from master')
  340. os.write(write_to_stdin_fd, b'from stdin')
  341. # Expect two select calls, the last one will cause IndexError
  342. pty.select = self._mock_select
  343. self.select_rfds_lengths.append(2)
  344. self.select_rfds_results.append([mock_stdin_fd, masters[0]])
  345. self.select_rfds_lengths.append(2)
  346. with self.assertRaises(IndexError):
  347. pty._copy(masters[0])
  348. # Test that the right data went to the right places.
  349. rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
  350. self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
  351. self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
  352. self.assertEqual(os.read(masters[1], 20), b'from stdin')
  353. def test__copy_eof_on_all(self):
  354. """Test the empty read EOF case on both master_fd and stdin."""
  355. read_from_stdout_fd, mock_stdout_fd = self._pipe()
  356. pty.STDOUT_FILENO = mock_stdout_fd
  357. mock_stdin_fd, write_to_stdin_fd = self._pipe()
  358. pty.STDIN_FILENO = mock_stdin_fd
  359. socketpair = self._socketpair()
  360. masters = [s.fileno() for s in socketpair]
  361. socketpair[1].close()
  362. os.close(write_to_stdin_fd)
  363. pty.select = self._mock_select
  364. self.select_rfds_lengths.append(2)
  365. self.select_rfds_results.append([mock_stdin_fd, masters[0]])
  366. # We expect that both fds were removed from the fds list as they
  367. # both encountered an EOF before the second select call.
  368. self.select_rfds_lengths.append(0)
  369. # We expect the function to return without error.
  370. self.assertEqual(pty._copy(masters[0]), None)
  371. def test__restore_tty_mode_normal_return(self):
  372. """Test that spawn resets the tty mode no when _copy returns normally."""
  373. # PID 1 is returned from mocked fork to run the parent branch
  374. # of code
  375. pty.fork = self._make_mock_fork(1)
  376. status_sentinel = object()
  377. pty.waitpid = lambda _1, _2: [None, status_sentinel]
  378. pty.close = lambda _: None
  379. pty._copy = lambda _1, _2, _3: None
  380. mode_sentinel = object()
  381. pty.tcgetattr = lambda fd: mode_sentinel
  382. pty.tcsetattr = self._mock_tcsetattr
  383. pty.setraw = lambda _: None
  384. self.assertEqual(pty.spawn([]), status_sentinel, "pty.waitpid process status not returned by pty.spawn")
  385. self.assertEqual(self.tcsetattr_mode_setting, mode_sentinel, "pty.tcsetattr not called with original mode value")
  386. def tearDownModule():
  387. reap_children()
  388. if __name__ == "__main__":
  389. unittest.main()