test_telnetlib.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. import socket
  2. import selectors
  3. import threading
  4. import contextlib
  5. from test import support
  6. from test.support import socket_helper, warnings_helper
  7. import unittest
  8. support.requires_working_socket(module=True)
  9. telnetlib = warnings_helper.import_deprecated('telnetlib')
  10. HOST = socket_helper.HOST
  11. def server(evt, serv):
  12. serv.listen()
  13. evt.set()
  14. try:
  15. conn, addr = serv.accept()
  16. conn.close()
  17. except TimeoutError:
  18. pass
  19. finally:
  20. serv.close()
  21. class GeneralTests(unittest.TestCase):
  22. def setUp(self):
  23. self.evt = threading.Event()
  24. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  25. self.sock.settimeout(60) # Safety net. Look issue 11812
  26. self.port = socket_helper.bind_port(self.sock)
  27. self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
  28. self.thread.daemon = True
  29. self.thread.start()
  30. self.evt.wait()
  31. def tearDown(self):
  32. self.thread.join()
  33. del self.thread # Clear out any dangling Thread objects.
  34. def testBasic(self):
  35. # connects
  36. telnet = telnetlib.Telnet(HOST, self.port)
  37. telnet.sock.close()
  38. def testContextManager(self):
  39. with telnetlib.Telnet(HOST, self.port) as tn:
  40. self.assertIsNotNone(tn.get_socket())
  41. self.assertIsNone(tn.get_socket())
  42. def testTimeoutDefault(self):
  43. self.assertTrue(socket.getdefaulttimeout() is None)
  44. socket.setdefaulttimeout(30)
  45. try:
  46. telnet = telnetlib.Telnet(HOST, self.port)
  47. finally:
  48. socket.setdefaulttimeout(None)
  49. self.assertEqual(telnet.sock.gettimeout(), 30)
  50. telnet.sock.close()
  51. def testTimeoutNone(self):
  52. # None, having other default
  53. self.assertTrue(socket.getdefaulttimeout() is None)
  54. socket.setdefaulttimeout(30)
  55. try:
  56. telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
  57. finally:
  58. socket.setdefaulttimeout(None)
  59. self.assertTrue(telnet.sock.gettimeout() is None)
  60. telnet.sock.close()
  61. def testTimeoutValue(self):
  62. telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
  63. self.assertEqual(telnet.sock.gettimeout(), 30)
  64. telnet.sock.close()
  65. def testTimeoutOpen(self):
  66. telnet = telnetlib.Telnet()
  67. telnet.open(HOST, self.port, timeout=30)
  68. self.assertEqual(telnet.sock.gettimeout(), 30)
  69. telnet.sock.close()
  70. def testGetters(self):
  71. # Test telnet getter methods
  72. telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
  73. t_sock = telnet.sock
  74. self.assertEqual(telnet.get_socket(), t_sock)
  75. self.assertEqual(telnet.fileno(), t_sock.fileno())
  76. telnet.sock.close()
  77. class SocketStub(object):
  78. ''' a socket proxy that re-defines sendall() '''
  79. def __init__(self, reads=()):
  80. self.reads = list(reads) # Intentionally make a copy.
  81. self.writes = []
  82. self.block = False
  83. def sendall(self, data):
  84. self.writes.append(data)
  85. def recv(self, size):
  86. out = b''
  87. while self.reads and len(out) < size:
  88. out += self.reads.pop(0)
  89. if len(out) > size:
  90. self.reads.insert(0, out[size:])
  91. out = out[:size]
  92. return out
  93. class TelnetAlike(telnetlib.Telnet):
  94. def fileno(self):
  95. raise NotImplementedError()
  96. def close(self): pass
  97. def sock_avail(self):
  98. return (not self.sock.block)
  99. def msg(self, msg, *args):
  100. with support.captured_stdout() as out:
  101. telnetlib.Telnet.msg(self, msg, *args)
  102. self._messages += out.getvalue()
  103. return
  104. class MockSelector(selectors.BaseSelector):
  105. def __init__(self):
  106. self.keys = {}
  107. @property
  108. def resolution(self):
  109. return 1e-3
  110. def register(self, fileobj, events, data=None):
  111. key = selectors.SelectorKey(fileobj, 0, events, data)
  112. self.keys[fileobj] = key
  113. return key
  114. def unregister(self, fileobj):
  115. return self.keys.pop(fileobj)
  116. def select(self, timeout=None):
  117. block = False
  118. for fileobj in self.keys:
  119. if isinstance(fileobj, TelnetAlike):
  120. block = fileobj.sock.block
  121. break
  122. if block:
  123. return []
  124. else:
  125. return [(key, key.events) for key in self.keys.values()]
  126. def get_map(self):
  127. return self.keys
  128. @contextlib.contextmanager
  129. def test_socket(reads):
  130. def new_conn(*ignored):
  131. return SocketStub(reads)
  132. try:
  133. old_conn = socket.create_connection
  134. socket.create_connection = new_conn
  135. yield None
  136. finally:
  137. socket.create_connection = old_conn
  138. return
  139. def test_telnet(reads=(), cls=TelnetAlike):
  140. ''' return a telnetlib.Telnet object that uses a SocketStub with
  141. reads queued up to be read '''
  142. for x in reads:
  143. assert type(x) is bytes, x
  144. with test_socket(reads):
  145. telnet = cls('dummy', 0)
  146. telnet._messages = '' # debuglevel output
  147. return telnet
  148. class ExpectAndReadTestCase(unittest.TestCase):
  149. def setUp(self):
  150. self.old_selector = telnetlib._TelnetSelector
  151. telnetlib._TelnetSelector = MockSelector
  152. def tearDown(self):
  153. telnetlib._TelnetSelector = self.old_selector
  154. class ReadTests(ExpectAndReadTestCase):
  155. def test_read_until(self):
  156. """
  157. read_until(expected, timeout=None)
  158. test the blocking version of read_util
  159. """
  160. want = [b'xxxmatchyyy']
  161. telnet = test_telnet(want)
  162. data = telnet.read_until(b'match')
  163. self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
  164. reads = [b'x' * 50, b'match', b'y' * 50]
  165. expect = b''.join(reads[:-1])
  166. telnet = test_telnet(reads)
  167. data = telnet.read_until(b'match')
  168. self.assertEqual(data, expect)
  169. def test_read_all(self):
  170. """
  171. read_all()
  172. Read all data until EOF; may block.
  173. """
  174. reads = [b'x' * 500, b'y' * 500, b'z' * 500]
  175. expect = b''.join(reads)
  176. telnet = test_telnet(reads)
  177. data = telnet.read_all()
  178. self.assertEqual(data, expect)
  179. return
  180. def test_read_some(self):
  181. """
  182. read_some()
  183. Read at least one byte or EOF; may block.
  184. """
  185. # test 'at least one byte'
  186. telnet = test_telnet([b'x' * 500])
  187. data = telnet.read_some()
  188. self.assertTrue(len(data) >= 1)
  189. # test EOF
  190. telnet = test_telnet()
  191. data = telnet.read_some()
  192. self.assertEqual(b'', data)
  193. def _read_eager(self, func_name):
  194. """
  195. read_*_eager()
  196. Read all data available already queued or on the socket,
  197. without blocking.
  198. """
  199. want = b'x' * 100
  200. telnet = test_telnet([want])
  201. func = getattr(telnet, func_name)
  202. telnet.sock.block = True
  203. self.assertEqual(b'', func())
  204. telnet.sock.block = False
  205. data = b''
  206. while True:
  207. try:
  208. data += func()
  209. except EOFError:
  210. break
  211. self.assertEqual(data, want)
  212. def test_read_eager(self):
  213. # read_eager and read_very_eager make the same guarantees
  214. # (they behave differently but we only test the guarantees)
  215. self._read_eager('read_eager')
  216. self._read_eager('read_very_eager')
  217. # NB -- we need to test the IAC block which is mentioned in the
  218. # docstring but not in the module docs
  219. def read_very_lazy(self):
  220. want = b'x' * 100
  221. telnet = test_telnet([want])
  222. self.assertEqual(b'', telnet.read_very_lazy())
  223. while telnet.sock.reads:
  224. telnet.fill_rawq()
  225. data = telnet.read_very_lazy()
  226. self.assertEqual(want, data)
  227. self.assertRaises(EOFError, telnet.read_very_lazy)
  228. def test_read_lazy(self):
  229. want = b'x' * 100
  230. telnet = test_telnet([want])
  231. self.assertEqual(b'', telnet.read_lazy())
  232. data = b''
  233. while True:
  234. try:
  235. read_data = telnet.read_lazy()
  236. data += read_data
  237. if not read_data:
  238. telnet.fill_rawq()
  239. except EOFError:
  240. break
  241. self.assertTrue(want.startswith(data))
  242. self.assertEqual(data, want)
  243. class nego_collector(object):
  244. def __init__(self, sb_getter=None):
  245. self.seen = b''
  246. self.sb_getter = sb_getter
  247. self.sb_seen = b''
  248. def do_nego(self, sock, cmd, opt):
  249. self.seen += cmd + opt
  250. if cmd == tl.SE and self.sb_getter:
  251. sb_data = self.sb_getter()
  252. self.sb_seen += sb_data
  253. tl = telnetlib
  254. class WriteTests(unittest.TestCase):
  255. '''The only thing that write does is replace each tl.IAC for
  256. tl.IAC+tl.IAC'''
  257. def test_write(self):
  258. data_sample = [b'data sample without IAC',
  259. b'data sample with' + tl.IAC + b' one IAC',
  260. b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
  261. tl.IAC,
  262. b'']
  263. for data in data_sample:
  264. telnet = test_telnet()
  265. telnet.write(data)
  266. written = b''.join(telnet.sock.writes)
  267. self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
  268. class OptionTests(unittest.TestCase):
  269. # RFC 854 commands
  270. cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
  271. def _test_command(self, data):
  272. """ helper for testing IAC + cmd """
  273. telnet = test_telnet(data)
  274. data_len = len(b''.join(data))
  275. nego = nego_collector()
  276. telnet.set_option_negotiation_callback(nego.do_nego)
  277. txt = telnet.read_all()
  278. cmd = nego.seen
  279. self.assertTrue(len(cmd) > 0) # we expect at least one command
  280. self.assertIn(cmd[:1], self.cmds)
  281. self.assertEqual(cmd[1:2], tl.NOOPT)
  282. self.assertEqual(data_len, len(txt + cmd))
  283. nego.sb_getter = None # break the nego => telnet cycle
  284. def test_IAC_commands(self):
  285. for cmd in self.cmds:
  286. self._test_command([tl.IAC, cmd])
  287. self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
  288. self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
  289. # all at once
  290. self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
  291. def test_SB_commands(self):
  292. # RFC 855, subnegotiations portion
  293. send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
  294. tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
  295. tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
  296. tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
  297. tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
  298. ]
  299. telnet = test_telnet(send)
  300. nego = nego_collector(telnet.read_sb_data)
  301. telnet.set_option_negotiation_callback(nego.do_nego)
  302. txt = telnet.read_all()
  303. self.assertEqual(txt, b'')
  304. want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
  305. self.assertEqual(nego.sb_seen, want_sb_data)
  306. self.assertEqual(b'', telnet.read_sb_data())
  307. nego.sb_getter = None # break the nego => telnet cycle
  308. def test_debuglevel_reads(self):
  309. # test all the various places that self.msg(...) is called
  310. given_a_expect_b = [
  311. # Telnet.fill_rawq
  312. (b'a', ": recv b''\n"),
  313. # Telnet.process_rawq
  314. (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
  315. (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
  316. (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
  317. (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
  318. (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
  319. ]
  320. for a, b in given_a_expect_b:
  321. telnet = test_telnet([a])
  322. telnet.set_debuglevel(1)
  323. txt = telnet.read_all()
  324. self.assertIn(b, telnet._messages)
  325. return
  326. def test_debuglevel_write(self):
  327. telnet = test_telnet()
  328. telnet.set_debuglevel(1)
  329. telnet.write(b'xxx')
  330. expected = "send b'xxx'\n"
  331. self.assertIn(expected, telnet._messages)
  332. def test_debug_accepts_str_port(self):
  333. # Issue 10695
  334. with test_socket([]):
  335. telnet = TelnetAlike('dummy', '0')
  336. telnet._messages = ''
  337. telnet.set_debuglevel(1)
  338. telnet.msg('test')
  339. self.assertRegex(telnet._messages, r'0.*test')
  340. class ExpectTests(ExpectAndReadTestCase):
  341. def test_expect(self):
  342. """
  343. expect(expected, [timeout])
  344. Read until the expected string has been seen, or a timeout is
  345. hit (default is no timeout); may block.
  346. """
  347. want = [b'x' * 10, b'match', b'y' * 10]
  348. telnet = test_telnet(want)
  349. (_,_,data) = telnet.expect([b'match'])
  350. self.assertEqual(data, b''.join(want[:-1]))
  351. if __name__ == '__main__':
  352. unittest.main()