test_poplib.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. """Test script for poplib module."""
  2. # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
  3. # a real test suite
  4. import poplib
  5. import socket
  6. import os
  7. import errno
  8. import threading
  9. import unittest
  10. from unittest import TestCase, skipUnless
  11. from test import support as test_support
  12. from test.support import hashlib_helper
  13. from test.support import socket_helper
  14. from test.support import threading_helper
  15. from test.support import warnings_helper
  16. asynchat = warnings_helper.import_deprecated('asynchat')
  17. asyncore = warnings_helper.import_deprecated('asyncore')
  18. test_support.requires_working_socket(module=True)
  19. HOST = socket_helper.HOST
  20. PORT = 0
  21. SUPPORTS_SSL = False
  22. if hasattr(poplib, 'POP3_SSL'):
  23. import ssl
  24. SUPPORTS_SSL = True
  25. CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
  26. CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
  27. requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
  28. # the dummy data returned by server when LIST and RETR commands are issued
  29. LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
  30. RETR_RESP = b"""From: postmaster@python.org\
  31. \r\nContent-Type: text/plain\r\n\
  32. MIME-Version: 1.0\r\n\
  33. Subject: Dummy\r\n\
  34. \r\n\
  35. line1\r\n\
  36. line2\r\n\
  37. line3\r\n\
  38. .\r\n"""
  39. class DummyPOP3Handler(asynchat.async_chat):
  40. CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
  41. enable_UTF8 = False
  42. def __init__(self, conn):
  43. asynchat.async_chat.__init__(self, conn)
  44. self.set_terminator(b"\r\n")
  45. self.in_buffer = []
  46. self.push('+OK dummy pop3 server ready. <timestamp>')
  47. self.tls_active = False
  48. self.tls_starting = False
  49. def collect_incoming_data(self, data):
  50. self.in_buffer.append(data)
  51. def found_terminator(self):
  52. line = b''.join(self.in_buffer)
  53. line = str(line, 'ISO-8859-1')
  54. self.in_buffer = []
  55. cmd = line.split(' ')[0].lower()
  56. space = line.find(' ')
  57. if space != -1:
  58. arg = line[space + 1:]
  59. else:
  60. arg = ""
  61. if hasattr(self, 'cmd_' + cmd):
  62. method = getattr(self, 'cmd_' + cmd)
  63. method(arg)
  64. else:
  65. self.push('-ERR unrecognized POP3 command "%s".' %cmd)
  66. def handle_error(self):
  67. raise
  68. def push(self, data):
  69. asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
  70. def cmd_echo(self, arg):
  71. # sends back the received string (used by the test suite)
  72. self.push(arg)
  73. def cmd_user(self, arg):
  74. if arg != "guido":
  75. self.push("-ERR no such user")
  76. self.push('+OK password required')
  77. def cmd_pass(self, arg):
  78. if arg != "python":
  79. self.push("-ERR wrong password")
  80. self.push('+OK 10 messages')
  81. def cmd_stat(self, arg):
  82. self.push('+OK 10 100')
  83. def cmd_list(self, arg):
  84. if arg:
  85. self.push('+OK %s %s' % (arg, arg))
  86. else:
  87. self.push('+OK')
  88. asynchat.async_chat.push(self, LIST_RESP)
  89. cmd_uidl = cmd_list
  90. def cmd_retr(self, arg):
  91. self.push('+OK %s bytes' %len(RETR_RESP))
  92. asynchat.async_chat.push(self, RETR_RESP)
  93. cmd_top = cmd_retr
  94. def cmd_dele(self, arg):
  95. self.push('+OK message marked for deletion.')
  96. def cmd_noop(self, arg):
  97. self.push('+OK done nothing.')
  98. def cmd_rpop(self, arg):
  99. self.push('+OK done nothing.')
  100. def cmd_apop(self, arg):
  101. self.push('+OK done nothing.')
  102. def cmd_quit(self, arg):
  103. self.push('+OK closing.')
  104. self.close_when_done()
  105. def _get_capas(self):
  106. _capas = dict(self.CAPAS)
  107. if not self.tls_active and SUPPORTS_SSL:
  108. _capas['STLS'] = []
  109. return _capas
  110. def cmd_capa(self, arg):
  111. self.push('+OK Capability list follows')
  112. if self._get_capas():
  113. for cap, params in self._get_capas().items():
  114. _ln = [cap]
  115. if params:
  116. _ln.extend(params)
  117. self.push(' '.join(_ln))
  118. self.push('.')
  119. def cmd_utf8(self, arg):
  120. self.push('+OK I know RFC6856'
  121. if self.enable_UTF8
  122. else '-ERR What is UTF8?!')
  123. if SUPPORTS_SSL:
  124. def cmd_stls(self, arg):
  125. if self.tls_active is False:
  126. self.push('+OK Begin TLS negotiation')
  127. context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  128. context.load_cert_chain(CERTFILE)
  129. tls_sock = context.wrap_socket(self.socket,
  130. server_side=True,
  131. do_handshake_on_connect=False,
  132. suppress_ragged_eofs=False)
  133. self.del_channel()
  134. self.set_socket(tls_sock)
  135. self.tls_active = True
  136. self.tls_starting = True
  137. self.in_buffer = []
  138. self._do_tls_handshake()
  139. else:
  140. self.push('-ERR Command not permitted when TLS active')
  141. def _do_tls_handshake(self):
  142. try:
  143. self.socket.do_handshake()
  144. except ssl.SSLError as err:
  145. if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
  146. ssl.SSL_ERROR_WANT_WRITE):
  147. return
  148. elif err.args[0] == ssl.SSL_ERROR_EOF:
  149. return self.handle_close()
  150. # TODO: SSLError does not expose alert information
  151. elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or
  152. "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]):
  153. return self.handle_close()
  154. raise
  155. except OSError as err:
  156. if err.args[0] == errno.ECONNABORTED:
  157. return self.handle_close()
  158. else:
  159. self.tls_active = True
  160. self.tls_starting = False
  161. def handle_read(self):
  162. if self.tls_starting:
  163. self._do_tls_handshake()
  164. else:
  165. try:
  166. asynchat.async_chat.handle_read(self)
  167. except ssl.SSLEOFError:
  168. self.handle_close()
  169. class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
  170. handler = DummyPOP3Handler
  171. def __init__(self, address, af=socket.AF_INET):
  172. threading.Thread.__init__(self)
  173. asyncore.dispatcher.__init__(self)
  174. self.daemon = True
  175. self.create_socket(af, socket.SOCK_STREAM)
  176. self.bind(address)
  177. self.listen(5)
  178. self.active = False
  179. self.active_lock = threading.Lock()
  180. self.host, self.port = self.socket.getsockname()[:2]
  181. self.handler_instance = None
  182. def start(self):
  183. assert not self.active
  184. self.__flag = threading.Event()
  185. threading.Thread.start(self)
  186. self.__flag.wait()
  187. def run(self):
  188. self.active = True
  189. self.__flag.set()
  190. try:
  191. while self.active and asyncore.socket_map:
  192. with self.active_lock:
  193. asyncore.loop(timeout=0.1, count=1)
  194. finally:
  195. asyncore.close_all(ignore_all=True)
  196. def stop(self):
  197. assert self.active
  198. self.active = False
  199. self.join()
  200. def handle_accepted(self, conn, addr):
  201. self.handler_instance = self.handler(conn)
  202. def handle_connect(self):
  203. self.close()
  204. handle_read = handle_connect
  205. def writable(self):
  206. return 0
  207. def handle_error(self):
  208. raise
  209. class TestPOP3Class(TestCase):
  210. def assertOK(self, resp):
  211. self.assertTrue(resp.startswith(b"+OK"))
  212. def setUp(self):
  213. self.server = DummyPOP3Server((HOST, PORT))
  214. self.server.start()
  215. self.client = poplib.POP3(self.server.host, self.server.port,
  216. timeout=test_support.LOOPBACK_TIMEOUT)
  217. def tearDown(self):
  218. self.client.close()
  219. self.server.stop()
  220. # Explicitly clear the attribute to prevent dangling thread
  221. self.server = None
  222. def test_getwelcome(self):
  223. self.assertEqual(self.client.getwelcome(),
  224. b'+OK dummy pop3 server ready. <timestamp>')
  225. def test_exceptions(self):
  226. self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
  227. def test_user(self):
  228. self.assertOK(self.client.user('guido'))
  229. self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
  230. def test_pass_(self):
  231. self.assertOK(self.client.pass_('python'))
  232. self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
  233. def test_stat(self):
  234. self.assertEqual(self.client.stat(), (10, 100))
  235. def test_list(self):
  236. self.assertEqual(self.client.list()[1:],
  237. ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
  238. 25))
  239. self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
  240. def test_retr(self):
  241. expected = (b'+OK 116 bytes',
  242. [b'From: postmaster@python.org', b'Content-Type: text/plain',
  243. b'MIME-Version: 1.0', b'Subject: Dummy',
  244. b'', b'line1', b'line2', b'line3'],
  245. 113)
  246. foo = self.client.retr('foo')
  247. self.assertEqual(foo, expected)
  248. def test_too_long_lines(self):
  249. self.assertRaises(poplib.error_proto, self.client._shortcmd,
  250. 'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
  251. def test_dele(self):
  252. self.assertOK(self.client.dele('foo'))
  253. def test_noop(self):
  254. self.assertOK(self.client.noop())
  255. def test_rpop(self):
  256. self.assertOK(self.client.rpop('foo'))
  257. @hashlib_helper.requires_hashdigest('md5', openssl=True)
  258. def test_apop_normal(self):
  259. self.assertOK(self.client.apop('foo', 'dummypassword'))
  260. @hashlib_helper.requires_hashdigest('md5', openssl=True)
  261. def test_apop_REDOS(self):
  262. # Replace welcome with very long evil welcome.
  263. # NB The upper bound on welcome length is currently 2048.
  264. # At this length, evil input makes each apop call take
  265. # on the order of milliseconds instead of microseconds.
  266. evil_welcome = b'+OK' + (b'<' * 1000000)
  267. with test_support.swap_attr(self.client, 'welcome', evil_welcome):
  268. # The evil welcome is invalid, so apop should throw.
  269. self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb')
  270. def test_top(self):
  271. expected = (b'+OK 116 bytes',
  272. [b'From: postmaster@python.org', b'Content-Type: text/plain',
  273. b'MIME-Version: 1.0', b'Subject: Dummy', b'',
  274. b'line1', b'line2', b'line3'],
  275. 113)
  276. self.assertEqual(self.client.top(1, 1), expected)
  277. def test_uidl(self):
  278. self.client.uidl()
  279. self.client.uidl('foo')
  280. def test_utf8_raises_if_unsupported(self):
  281. self.server.handler.enable_UTF8 = False
  282. self.assertRaises(poplib.error_proto, self.client.utf8)
  283. def test_utf8(self):
  284. self.server.handler.enable_UTF8 = True
  285. expected = b'+OK I know RFC6856'
  286. result = self.client.utf8()
  287. self.assertEqual(result, expected)
  288. def test_capa(self):
  289. capa = self.client.capa()
  290. self.assertTrue('IMPLEMENTATION' in capa.keys())
  291. def test_quit(self):
  292. resp = self.client.quit()
  293. self.assertTrue(resp)
  294. self.assertIsNone(self.client.sock)
  295. self.assertIsNone(self.client.file)
  296. @requires_ssl
  297. def test_stls_capa(self):
  298. capa = self.client.capa()
  299. self.assertTrue('STLS' in capa.keys())
  300. @requires_ssl
  301. def test_stls(self):
  302. expected = b'+OK Begin TLS negotiation'
  303. resp = self.client.stls()
  304. self.assertEqual(resp, expected)
  305. @requires_ssl
  306. def test_stls_context(self):
  307. expected = b'+OK Begin TLS negotiation'
  308. ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  309. ctx.load_verify_locations(CAFILE)
  310. self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
  311. self.assertEqual(ctx.check_hostname, True)
  312. with self.assertRaises(ssl.CertificateError):
  313. resp = self.client.stls(context=ctx)
  314. self.client = poplib.POP3("localhost", self.server.port,
  315. timeout=test_support.LOOPBACK_TIMEOUT)
  316. resp = self.client.stls(context=ctx)
  317. self.assertEqual(resp, expected)
  318. if SUPPORTS_SSL:
  319. from test.test_ftplib import SSLConnection
  320. class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler):
  321. def __init__(self, conn):
  322. asynchat.async_chat.__init__(self, conn)
  323. self.secure_connection()
  324. self.set_terminator(b"\r\n")
  325. self.in_buffer = []
  326. self.push('+OK dummy pop3 server ready. <timestamp>')
  327. self.tls_active = True
  328. self.tls_starting = False
  329. @requires_ssl
  330. class TestPOP3_SSLClass(TestPOP3Class):
  331. # repeat previous tests by using poplib.POP3_SSL
  332. def setUp(self):
  333. self.server = DummyPOP3Server((HOST, PORT))
  334. self.server.handler = DummyPOP3_SSLHandler
  335. self.server.start()
  336. self.client = poplib.POP3_SSL(self.server.host, self.server.port)
  337. def test__all__(self):
  338. self.assertIn('POP3_SSL', poplib.__all__)
  339. def test_context(self):
  340. ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  341. ctx.check_hostname = False
  342. ctx.verify_mode = ssl.CERT_NONE
  343. self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
  344. self.server.port, keyfile=CERTFILE, context=ctx)
  345. self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
  346. self.server.port, certfile=CERTFILE, context=ctx)
  347. self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
  348. self.server.port, keyfile=CERTFILE,
  349. certfile=CERTFILE, context=ctx)
  350. self.client.quit()
  351. self.client = poplib.POP3_SSL(self.server.host, self.server.port,
  352. context=ctx)
  353. self.assertIsInstance(self.client.sock, ssl.SSLSocket)
  354. self.assertIs(self.client.sock.context, ctx)
  355. self.assertTrue(self.client.noop().startswith(b'+OK'))
  356. def test_stls(self):
  357. self.assertRaises(poplib.error_proto, self.client.stls)
  358. test_stls_context = test_stls
  359. def test_stls_capa(self):
  360. capa = self.client.capa()
  361. self.assertFalse('STLS' in capa.keys())
  362. @requires_ssl
  363. class TestPOP3_TLSClass(TestPOP3Class):
  364. # repeat previous tests by using poplib.POP3.stls()
  365. def setUp(self):
  366. self.server = DummyPOP3Server((HOST, PORT))
  367. self.server.start()
  368. self.client = poplib.POP3(self.server.host, self.server.port,
  369. timeout=test_support.LOOPBACK_TIMEOUT)
  370. self.client.stls()
  371. def tearDown(self):
  372. if self.client.file is not None and self.client.sock is not None:
  373. try:
  374. self.client.quit()
  375. except poplib.error_proto:
  376. # happens in the test_too_long_lines case; the overlong
  377. # response will be treated as response to QUIT and raise
  378. # this exception
  379. self.client.close()
  380. self.server.stop()
  381. # Explicitly clear the attribute to prevent dangling thread
  382. self.server = None
  383. def test_stls(self):
  384. self.assertRaises(poplib.error_proto, self.client.stls)
  385. test_stls_context = test_stls
  386. def test_stls_capa(self):
  387. capa = self.client.capa()
  388. self.assertFalse(b'STLS' in capa.keys())
  389. class TestTimeouts(TestCase):
  390. def setUp(self):
  391. self.evt = threading.Event()
  392. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  393. self.sock.settimeout(60) # Safety net. Look issue 11812
  394. self.port = socket_helper.bind_port(self.sock)
  395. self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
  396. self.thread.daemon = True
  397. self.thread.start()
  398. self.evt.wait()
  399. def tearDown(self):
  400. self.thread.join()
  401. # Explicitly clear the attribute to prevent dangling thread
  402. self.thread = None
  403. def server(self, evt, serv):
  404. serv.listen()
  405. evt.set()
  406. try:
  407. conn, addr = serv.accept()
  408. conn.send(b"+ Hola mundo\n")
  409. conn.close()
  410. except TimeoutError:
  411. pass
  412. finally:
  413. serv.close()
  414. def testTimeoutDefault(self):
  415. self.assertIsNone(socket.getdefaulttimeout())
  416. socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT)
  417. try:
  418. pop = poplib.POP3(HOST, self.port)
  419. finally:
  420. socket.setdefaulttimeout(None)
  421. self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
  422. pop.close()
  423. def testTimeoutNone(self):
  424. self.assertIsNone(socket.getdefaulttimeout())
  425. socket.setdefaulttimeout(30)
  426. try:
  427. pop = poplib.POP3(HOST, self.port, timeout=None)
  428. finally:
  429. socket.setdefaulttimeout(None)
  430. self.assertIsNone(pop.sock.gettimeout())
  431. pop.close()
  432. def testTimeoutValue(self):
  433. pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT)
  434. self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
  435. pop.close()
  436. with self.assertRaises(ValueError):
  437. poplib.POP3(HOST, self.port, timeout=0)
  438. def setUpModule():
  439. thread_info = threading_helper.threading_setup()
  440. unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
  441. if __name__ == '__main__':
  442. unittest.main()