test_nntplib.py 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646
  1. import io
  2. import socket
  3. import datetime
  4. import textwrap
  5. import unittest
  6. import functools
  7. import contextlib
  8. import os.path
  9. import re
  10. import threading
  11. from test import support
  12. from test.support import socket_helper, warnings_helper
  13. nntplib = warnings_helper.import_deprecated("nntplib")
  14. from nntplib import NNTP, GroupInfo
  15. from unittest.mock import patch
  16. try:
  17. import ssl
  18. except ImportError:
  19. ssl = None
  20. certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
  21. if ssl is not None:
  22. SSLError = ssl.SSLError
  23. else:
  24. class SSLError(Exception):
  25. """Non-existent exception class when we lack SSL support."""
  26. reason = "This will never be raised."
  27. # TODO:
  28. # - test the `file` arg to more commands
  29. # - test error conditions
  30. # - test auth and `usenetrc`
  31. class NetworkedNNTPTestsMixin:
  32. ssl_context = None
  33. def test_welcome(self):
  34. welcome = self.server.getwelcome()
  35. self.assertEqual(str, type(welcome))
  36. def test_help(self):
  37. resp, lines = self.server.help()
  38. self.assertTrue(resp.startswith("100 "), resp)
  39. for line in lines:
  40. self.assertEqual(str, type(line))
  41. def test_list(self):
  42. resp, groups = self.server.list()
  43. if len(groups) > 0:
  44. self.assertEqual(GroupInfo, type(groups[0]))
  45. self.assertEqual(str, type(groups[0].group))
  46. def test_list_active(self):
  47. resp, groups = self.server.list(self.GROUP_PAT)
  48. if len(groups) > 0:
  49. self.assertEqual(GroupInfo, type(groups[0]))
  50. self.assertEqual(str, type(groups[0].group))
  51. def test_unknown_command(self):
  52. with self.assertRaises(nntplib.NNTPPermanentError) as cm:
  53. self.server._shortcmd("XYZZY")
  54. resp = cm.exception.response
  55. self.assertTrue(resp.startswith("500 "), resp)
  56. def test_newgroups(self):
  57. # gmane gets a constant influx of new groups. In order not to stress
  58. # the server too much, we choose a recent date in the past.
  59. dt = datetime.date.today() - datetime.timedelta(days=7)
  60. resp, groups = self.server.newgroups(dt)
  61. if len(groups) > 0:
  62. self.assertIsInstance(groups[0], GroupInfo)
  63. self.assertIsInstance(groups[0].group, str)
  64. def test_description(self):
  65. def _check_desc(desc):
  66. # Sanity checks
  67. self.assertIsInstance(desc, str)
  68. self.assertNotIn(self.GROUP_NAME, desc)
  69. desc = self.server.description(self.GROUP_NAME)
  70. _check_desc(desc)
  71. # Another sanity check
  72. self.assertIn(self.DESC, desc)
  73. # With a pattern
  74. desc = self.server.description(self.GROUP_PAT)
  75. _check_desc(desc)
  76. # Shouldn't exist
  77. desc = self.server.description("zk.brrtt.baz")
  78. self.assertEqual(desc, '')
  79. def test_descriptions(self):
  80. resp, descs = self.server.descriptions(self.GROUP_PAT)
  81. # 215 for LIST NEWSGROUPS, 282 for XGTITLE
  82. self.assertTrue(
  83. resp.startswith("215 ") or resp.startswith("282 "), resp)
  84. self.assertIsInstance(descs, dict)
  85. desc = descs[self.GROUP_NAME]
  86. self.assertEqual(desc, self.server.description(self.GROUP_NAME))
  87. def test_group(self):
  88. result = self.server.group(self.GROUP_NAME)
  89. self.assertEqual(5, len(result))
  90. resp, count, first, last, group = result
  91. self.assertEqual(group, self.GROUP_NAME)
  92. self.assertIsInstance(count, int)
  93. self.assertIsInstance(first, int)
  94. self.assertIsInstance(last, int)
  95. self.assertLessEqual(first, last)
  96. self.assertTrue(resp.startswith("211 "), resp)
  97. def test_date(self):
  98. resp, date = self.server.date()
  99. self.assertIsInstance(date, datetime.datetime)
  100. # Sanity check
  101. self.assertGreaterEqual(date.year, 1995)
  102. self.assertLessEqual(date.year, 2030)
  103. def _check_art_dict(self, art_dict):
  104. # Some sanity checks for a field dictionary returned by OVER / XOVER
  105. self.assertIsInstance(art_dict, dict)
  106. # NNTP has 7 mandatory fields
  107. self.assertGreaterEqual(art_dict.keys(),
  108. {"subject", "from", "date", "message-id",
  109. "references", ":bytes", ":lines"}
  110. )
  111. for v in art_dict.values():
  112. self.assertIsInstance(v, (str, type(None)))
  113. def test_xover(self):
  114. resp, count, first, last, name = self.server.group(self.GROUP_NAME)
  115. resp, lines = self.server.xover(last - 5, last)
  116. if len(lines) == 0:
  117. self.skipTest("no articles retrieved")
  118. # The 'last' article is not necessarily part of the output (cancelled?)
  119. art_num, art_dict = lines[0]
  120. self.assertGreaterEqual(art_num, last - 5)
  121. self.assertLessEqual(art_num, last)
  122. self._check_art_dict(art_dict)
  123. @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
  124. ' is found for issue #28971')
  125. def test_over(self):
  126. resp, count, first, last, name = self.server.group(self.GROUP_NAME)
  127. start = last - 10
  128. # The "start-" article range form
  129. resp, lines = self.server.over((start, None))
  130. art_num, art_dict = lines[0]
  131. self._check_art_dict(art_dict)
  132. # The "start-end" article range form
  133. resp, lines = self.server.over((start, last))
  134. art_num, art_dict = lines[-1]
  135. # The 'last' article is not necessarily part of the output (cancelled?)
  136. self.assertGreaterEqual(art_num, start)
  137. self.assertLessEqual(art_num, last)
  138. self._check_art_dict(art_dict)
  139. # XXX The "message_id" form is unsupported by gmane
  140. # 503 Overview by message-ID unsupported
  141. def test_xhdr(self):
  142. resp, count, first, last, name = self.server.group(self.GROUP_NAME)
  143. resp, lines = self.server.xhdr('subject', last)
  144. for line in lines:
  145. self.assertEqual(str, type(line[1]))
  146. def check_article_resp(self, resp, article, art_num=None):
  147. self.assertIsInstance(article, nntplib.ArticleInfo)
  148. if art_num is not None:
  149. self.assertEqual(article.number, art_num)
  150. for line in article.lines:
  151. self.assertIsInstance(line, bytes)
  152. # XXX this could exceptionally happen...
  153. self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
  154. @unittest.skipIf(True, "FIXME: see bpo-32128")
  155. def test_article_head_body(self):
  156. resp, count, first, last, name = self.server.group(self.GROUP_NAME)
  157. # Try to find an available article
  158. for art_num in (last, first, last - 1):
  159. try:
  160. resp, head = self.server.head(art_num)
  161. except nntplib.NNTPTemporaryError as e:
  162. if not e.response.startswith("423 "):
  163. raise
  164. # "423 No such article" => choose another one
  165. continue
  166. break
  167. else:
  168. self.skipTest("could not find a suitable article number")
  169. self.assertTrue(resp.startswith("221 "), resp)
  170. self.check_article_resp(resp, head, art_num)
  171. resp, body = self.server.body(art_num)
  172. self.assertTrue(resp.startswith("222 "), resp)
  173. self.check_article_resp(resp, body, art_num)
  174. resp, article = self.server.article(art_num)
  175. self.assertTrue(resp.startswith("220 "), resp)
  176. self.check_article_resp(resp, article, art_num)
  177. # Tolerate running the tests from behind a NNTP virus checker
  178. denylist = lambda line: line.startswith(b'X-Antivirus')
  179. filtered_head_lines = [line for line in head.lines
  180. if not denylist(line)]
  181. filtered_lines = [line for line in article.lines
  182. if not denylist(line)]
  183. self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
  184. def test_capabilities(self):
  185. # The server under test implements NNTP version 2 and has a
  186. # couple of well-known capabilities. Just sanity check that we
  187. # got them.
  188. def _check_caps(caps):
  189. caps_list = caps['LIST']
  190. self.assertIsInstance(caps_list, (list, tuple))
  191. self.assertIn('OVERVIEW.FMT', caps_list)
  192. self.assertGreaterEqual(self.server.nntp_version, 2)
  193. _check_caps(self.server.getcapabilities())
  194. # This re-emits the command
  195. resp, caps = self.server.capabilities()
  196. _check_caps(caps)
  197. def test_zlogin(self):
  198. # This test must be the penultimate because further commands will be
  199. # refused.
  200. baduser = "notarealuser"
  201. badpw = "notarealpassword"
  202. # Check that bogus credentials cause failure
  203. self.assertRaises(nntplib.NNTPError, self.server.login,
  204. user=baduser, password=badpw, usenetrc=False)
  205. # FIXME: We should check that correct credentials succeed, but that
  206. # would require valid details for some server somewhere to be in the
  207. # test suite, I think. Gmane is anonymous, at least as used for the
  208. # other tests.
  209. def test_zzquit(self):
  210. # This test must be called last, hence the name
  211. cls = type(self)
  212. try:
  213. self.server.quit()
  214. finally:
  215. cls.server = None
  216. @classmethod
  217. def wrap_methods(cls):
  218. # Wrap all methods in a transient_internet() exception catcher
  219. # XXX put a generic version in test.support?
  220. def wrap_meth(meth):
  221. @functools.wraps(meth)
  222. def wrapped(self):
  223. with socket_helper.transient_internet(self.NNTP_HOST):
  224. meth(self)
  225. return wrapped
  226. for name in dir(cls):
  227. if not name.startswith('test_'):
  228. continue
  229. meth = getattr(cls, name)
  230. if not callable(meth):
  231. continue
  232. # Need to use a closure so that meth remains bound to its current
  233. # value
  234. setattr(cls, name, wrap_meth(meth))
  235. def test_timeout(self):
  236. with self.assertRaises(ValueError):
  237. self.NNTP_CLASS(self.NNTP_HOST, timeout=0, usenetrc=False)
  238. def test_with_statement(self):
  239. def is_connected():
  240. if not hasattr(server, 'file'):
  241. return False
  242. try:
  243. server.help()
  244. except (OSError, EOFError):
  245. return False
  246. return True
  247. kwargs = dict(
  248. timeout=support.INTERNET_TIMEOUT,
  249. usenetrc=False
  250. )
  251. if self.ssl_context is not None:
  252. kwargs["ssl_context"] = self.ssl_context
  253. try:
  254. server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
  255. with server:
  256. self.assertTrue(is_connected())
  257. self.assertTrue(server.help())
  258. self.assertFalse(is_connected())
  259. server = self.NNTP_CLASS(self.NNTP_HOST, **kwargs)
  260. with server:
  261. server.quit()
  262. self.assertFalse(is_connected())
  263. except SSLError as ssl_err:
  264. # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
  265. if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
  266. raise unittest.SkipTest(f"Got {ssl_err} connecting "
  267. f"to {self.NNTP_HOST!r}")
  268. raise
  269. NetworkedNNTPTestsMixin.wrap_methods()
  270. EOF_ERRORS = (EOFError,)
  271. if ssl is not None:
  272. EOF_ERRORS += (ssl.SSLEOFError,)
  273. class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
  274. # This server supports STARTTLS (gmane doesn't)
  275. NNTP_HOST = 'news.trigofacile.com'
  276. GROUP_NAME = 'fr.comp.lang.python'
  277. GROUP_PAT = 'fr.comp.lang.*'
  278. DESC = 'Python'
  279. NNTP_CLASS = NNTP
  280. @classmethod
  281. def setUpClass(cls):
  282. support.requires("network")
  283. kwargs = dict(
  284. timeout=support.INTERNET_TIMEOUT,
  285. usenetrc=False
  286. )
  287. if cls.ssl_context is not None:
  288. kwargs["ssl_context"] = cls.ssl_context
  289. with socket_helper.transient_internet(cls.NNTP_HOST):
  290. try:
  291. cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, **kwargs)
  292. except SSLError as ssl_err:
  293. # matches "[SSL: DH_KEY_TOO_SMALL] dh key too small"
  294. if re.search(r'(?i)KEY.TOO.SMALL', ssl_err.reason):
  295. raise unittest.SkipTest(f"{cls} got {ssl_err} connecting "
  296. f"to {cls.NNTP_HOST!r}")
  297. print(cls.NNTP_HOST)
  298. raise
  299. except EOF_ERRORS:
  300. raise unittest.SkipTest(f"{cls} got EOF error on connecting "
  301. f"to {cls.NNTP_HOST!r}")
  302. @classmethod
  303. def tearDownClass(cls):
  304. if cls.server is not None:
  305. cls.server.quit()
  306. @unittest.skipUnless(ssl, 'requires SSL support')
  307. class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
  308. # Technical limits for this public NNTP server (see http://www.aioe.org):
  309. # "Only two concurrent connections per IP address are allowed and
  310. # 400 connections per day are accepted from each IP address."
  311. NNTP_HOST = 'nntp.aioe.org'
  312. # bpo-42794: aioe.test is one of the official groups on this server
  313. # used for testing: https://news.aioe.org/manual/aioe-hierarchy/
  314. GROUP_NAME = 'aioe.test'
  315. GROUP_PAT = 'aioe.*'
  316. DESC = 'test'
  317. NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
  318. # Disabled as it produces too much data
  319. test_list = None
  320. # Disabled as the connection will already be encrypted.
  321. test_starttls = None
  322. if ssl is not None:
  323. ssl_context = ssl._create_unverified_context()
  324. ssl_context.set_ciphers("DEFAULT")
  325. ssl_context.maximum_version = ssl.TLSVersion.TLSv1_2
  326. #
  327. # Non-networked tests using a local server (or something mocking it).
  328. #
  329. class _NNTPServerIO(io.RawIOBase):
  330. """A raw IO object allowing NNTP commands to be received and processed
  331. by a handler. The handler can push responses which can then be read
  332. from the IO object."""
  333. def __init__(self, handler):
  334. io.RawIOBase.__init__(self)
  335. # The channel from the client
  336. self.c2s = io.BytesIO()
  337. # The channel to the client
  338. self.s2c = io.BytesIO()
  339. self.handler = handler
  340. self.handler.start(self.c2s.readline, self.push_data)
  341. def readable(self):
  342. return True
  343. def writable(self):
  344. return True
  345. def push_data(self, data):
  346. """Push (buffer) some data to send to the client."""
  347. pos = self.s2c.tell()
  348. self.s2c.seek(0, 2)
  349. self.s2c.write(data)
  350. self.s2c.seek(pos)
  351. def write(self, b):
  352. """The client sends us some data"""
  353. pos = self.c2s.tell()
  354. self.c2s.write(b)
  355. self.c2s.seek(pos)
  356. self.handler.process_pending()
  357. return len(b)
  358. def readinto(self, buf):
  359. """The client wants to read a response"""
  360. self.handler.process_pending()
  361. b = self.s2c.read(len(buf))
  362. n = len(b)
  363. buf[:n] = b
  364. return n
  365. def make_mock_file(handler):
  366. sio = _NNTPServerIO(handler)
  367. # Using BufferedRWPair instead of BufferedRandom ensures the file
  368. # isn't seekable.
  369. file = io.BufferedRWPair(sio, sio)
  370. return (sio, file)
  371. class NNTPServer(nntplib.NNTP):
  372. def __init__(self, f, host, readermode=None):
  373. self.file = f
  374. self.host = host
  375. self._base_init(readermode)
  376. def _close(self):
  377. self.file.close()
  378. del self.file
  379. class MockedNNTPTestsMixin:
  380. # Override in derived classes
  381. handler_class = None
  382. def setUp(self):
  383. super().setUp()
  384. self.make_server()
  385. def tearDown(self):
  386. super().tearDown()
  387. del self.server
  388. def make_server(self, *args, **kwargs):
  389. self.handler = self.handler_class()
  390. self.sio, file = make_mock_file(self.handler)
  391. self.server = NNTPServer(file, 'test.server', *args, **kwargs)
  392. return self.server
  393. class MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
  394. def setUp(self):
  395. super().setUp()
  396. self.make_server(readermode=True)
  397. class NNTPv1Handler:
  398. """A handler for RFC 977"""
  399. welcome = "200 NNTP mock server"
  400. def start(self, readline, push_data):
  401. self.in_body = False
  402. self.allow_posting = True
  403. self._readline = readline
  404. self._push_data = push_data
  405. self._logged_in = False
  406. self._user_sent = False
  407. # Our welcome
  408. self.handle_welcome()
  409. def _decode(self, data):
  410. return str(data, "utf-8", "surrogateescape")
  411. def process_pending(self):
  412. if self.in_body:
  413. while True:
  414. line = self._readline()
  415. if not line:
  416. return
  417. self.body.append(line)
  418. if line == b".\r\n":
  419. break
  420. try:
  421. meth, tokens = self.body_callback
  422. meth(*tokens, body=self.body)
  423. finally:
  424. self.body_callback = None
  425. self.body = None
  426. self.in_body = False
  427. while True:
  428. line = self._decode(self._readline())
  429. if not line:
  430. return
  431. if not line.endswith("\r\n"):
  432. raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
  433. line = line[:-2]
  434. cmd, *tokens = line.split()
  435. #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
  436. meth = getattr(self, "handle_" + cmd.upper(), None)
  437. if meth is None:
  438. self.handle_unknown()
  439. else:
  440. try:
  441. meth(*tokens)
  442. except Exception as e:
  443. raise ValueError("command failed: {!r}".format(line)) from e
  444. else:
  445. if self.in_body:
  446. self.body_callback = meth, tokens
  447. self.body = []
  448. def expect_body(self):
  449. """Flag that the client is expected to post a request body"""
  450. self.in_body = True
  451. def push_data(self, data):
  452. """Push some binary data"""
  453. self._push_data(data)
  454. def push_lit(self, lit):
  455. """Push a string literal"""
  456. lit = textwrap.dedent(lit)
  457. lit = "\r\n".join(lit.splitlines()) + "\r\n"
  458. lit = lit.encode('utf-8')
  459. self.push_data(lit)
  460. def handle_unknown(self):
  461. self.push_lit("500 What?")
  462. def handle_welcome(self):
  463. self.push_lit(self.welcome)
  464. def handle_QUIT(self):
  465. self.push_lit("205 Bye!")
  466. def handle_DATE(self):
  467. self.push_lit("111 20100914001155")
  468. def handle_GROUP(self, group):
  469. if group == "fr.comp.lang.python":
  470. self.push_lit("211 486 761 1265 fr.comp.lang.python")
  471. else:
  472. self.push_lit("411 No such group {}".format(group))
  473. def handle_HELP(self):
  474. self.push_lit("""\
  475. 100 Legal commands
  476. authinfo user Name|pass Password|generic <prog> <args>
  477. date
  478. help
  479. Report problems to <root@example.org>
  480. .""")
  481. def handle_STAT(self, message_spec=None):
  482. if message_spec is None:
  483. self.push_lit("412 No newsgroup selected")
  484. elif message_spec == "3000234":
  485. self.push_lit("223 3000234 <45223423@example.com>")
  486. elif message_spec == "<45223423@example.com>":
  487. self.push_lit("223 0 <45223423@example.com>")
  488. else:
  489. self.push_lit("430 No Such Article Found")
  490. def handle_NEXT(self):
  491. self.push_lit("223 3000237 <668929@example.org> retrieved")
  492. def handle_LAST(self):
  493. self.push_lit("223 3000234 <45223423@example.com> retrieved")
  494. def handle_LIST(self, action=None, param=None):
  495. if action is None:
  496. self.push_lit("""\
  497. 215 Newsgroups in form "group high low flags".
  498. comp.lang.python 0000052340 0000002828 y
  499. comp.lang.python.announce 0000001153 0000000993 m
  500. free.it.comp.lang.python 0000000002 0000000002 y
  501. fr.comp.lang.python 0000001254 0000000760 y
  502. free.it.comp.lang.python.learner 0000000000 0000000001 y
  503. tw.bbs.comp.lang.python 0000000304 0000000304 y
  504. .""")
  505. elif action == "ACTIVE":
  506. if param == "*distutils*":
  507. self.push_lit("""\
  508. 215 Newsgroups in form "group high low flags"
  509. gmane.comp.python.distutils.devel 0000014104 0000000001 m
  510. gmane.comp.python.distutils.cvs 0000000000 0000000001 m
  511. .""")
  512. else:
  513. self.push_lit("""\
  514. 215 Newsgroups in form "group high low flags"
  515. .""")
  516. elif action == "OVERVIEW.FMT":
  517. self.push_lit("""\
  518. 215 Order of fields in overview database.
  519. Subject:
  520. From:
  521. Date:
  522. Message-ID:
  523. References:
  524. Bytes:
  525. Lines:
  526. Xref:full
  527. .""")
  528. elif action == "NEWSGROUPS":
  529. assert param is not None
  530. if param == "comp.lang.python":
  531. self.push_lit("""\
  532. 215 Descriptions in form "group description".
  533. comp.lang.python\tThe Python computer language.
  534. .""")
  535. elif param == "comp.lang.python*":
  536. self.push_lit("""\
  537. 215 Descriptions in form "group description".
  538. comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
  539. comp.lang.python\tThe Python computer language.
  540. .""")
  541. else:
  542. self.push_lit("""\
  543. 215 Descriptions in form "group description".
  544. .""")
  545. else:
  546. self.push_lit('501 Unknown LIST keyword')
  547. def handle_NEWNEWS(self, group, date_str, time_str):
  548. # We hard code different return messages depending on passed
  549. # argument and date syntax.
  550. if (group == "comp.lang.python" and date_str == "20100913"
  551. and time_str == "082004"):
  552. # Date was passed in RFC 3977 format (NNTP "v2")
  553. self.push_lit("""\
  554. 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
  555. <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
  556. <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
  557. .""")
  558. elif (group == "comp.lang.python" and date_str == "100913"
  559. and time_str == "082004"):
  560. # Date was passed in RFC 977 format (NNTP "v1")
  561. self.push_lit("""\
  562. 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
  563. <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
  564. <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
  565. .""")
  566. elif (group == 'comp.lang.python' and
  567. date_str in ('20100101', '100101') and
  568. time_str == '090000'):
  569. self.push_lit('too long line' * 3000 +
  570. '\n.')
  571. else:
  572. self.push_lit("""\
  573. 230 An empty list of newsarticles follows
  574. .""")
  575. # (Note for experiments: many servers disable NEWNEWS.
  576. # As of this writing, sicinfo3.epfl.ch doesn't.)
  577. def handle_XOVER(self, message_spec):
  578. if message_spec == "57-59":
  579. self.push_lit(
  580. "224 Overview information for 57-58 follows\n"
  581. "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
  582. "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
  583. "\tSat, 19 Jun 2010 18:04:08 -0400"
  584. "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
  585. "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
  586. "\tXref: news.gmane.io gmane.comp.python.authors:57"
  587. "\n"
  588. "58\tLooking for a few good bloggers"
  589. "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
  590. "\tThu, 22 Jul 2010 09:14:14 -0400"
  591. "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
  592. "\t\t6683\t16"
  593. "\t"
  594. "\n"
  595. # A UTF-8 overview line from fr.comp.lang.python
  596. "59\tRe: Message d'erreur incompréhensible (par moi)"
  597. "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
  598. "\tWed, 15 Sep 2010 18:09:15 +0200"
  599. "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
  600. "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
  601. "\tXref: saria.nerim.net fr.comp.lang.python:1265"
  602. "\n"
  603. ".\n")
  604. else:
  605. self.push_lit("""\
  606. 224 No articles
  607. .""")
  608. def handle_POST(self, *, body=None):
  609. if body is None:
  610. if self.allow_posting:
  611. self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
  612. self.expect_body()
  613. else:
  614. self.push_lit("440 Posting not permitted")
  615. else:
  616. assert self.allow_posting
  617. self.push_lit("240 Article received OK")
  618. self.posted_body = body
  619. def handle_IHAVE(self, message_id, *, body=None):
  620. if body is None:
  621. if (self.allow_posting and
  622. message_id == "<i.am.an.article.you.will.want@example.com>"):
  623. self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
  624. self.expect_body()
  625. else:
  626. self.push_lit("435 Article not wanted")
  627. else:
  628. assert self.allow_posting
  629. self.push_lit("235 Article transferred OK")
  630. self.posted_body = body
  631. sample_head = """\
  632. From: "Demo User" <nobody@example.net>
  633. Subject: I am just a test article
  634. Content-Type: text/plain; charset=UTF-8; format=flowed
  635. Message-ID: <i.am.an.article.you.will.want@example.com>"""
  636. sample_body = """\
  637. This is just a test article.
  638. ..Here is a dot-starting line.
  639. -- Signed by Andr\xe9."""
  640. sample_article = sample_head + "\n\n" + sample_body
  641. def handle_ARTICLE(self, message_spec=None):
  642. if message_spec is None:
  643. self.push_lit("220 3000237 <45223423@example.com>")
  644. elif message_spec == "<45223423@example.com>":
  645. self.push_lit("220 0 <45223423@example.com>")
  646. elif message_spec == "3000234":
  647. self.push_lit("220 3000234 <45223423@example.com>")
  648. else:
  649. self.push_lit("430 No Such Article Found")
  650. return
  651. self.push_lit(self.sample_article)
  652. self.push_lit(".")
  653. def handle_HEAD(self, message_spec=None):
  654. if message_spec is None:
  655. self.push_lit("221 3000237 <45223423@example.com>")
  656. elif message_spec == "<45223423@example.com>":
  657. self.push_lit("221 0 <45223423@example.com>")
  658. elif message_spec == "3000234":
  659. self.push_lit("221 3000234 <45223423@example.com>")
  660. else:
  661. self.push_lit("430 No Such Article Found")
  662. return
  663. self.push_lit(self.sample_head)
  664. self.push_lit(".")
  665. def handle_BODY(self, message_spec=None):
  666. if message_spec is None:
  667. self.push_lit("222 3000237 <45223423@example.com>")
  668. elif message_spec == "<45223423@example.com>":
  669. self.push_lit("222 0 <45223423@example.com>")
  670. elif message_spec == "3000234":
  671. self.push_lit("222 3000234 <45223423@example.com>")
  672. else:
  673. self.push_lit("430 No Such Article Found")
  674. return
  675. self.push_lit(self.sample_body)
  676. self.push_lit(".")
  677. def handle_AUTHINFO(self, cred_type, data):
  678. if self._logged_in:
  679. self.push_lit('502 Already Logged In')
  680. elif cred_type == 'user':
  681. if self._user_sent:
  682. self.push_lit('482 User Credential Already Sent')
  683. else:
  684. self.push_lit('381 Password Required')
  685. self._user_sent = True
  686. elif cred_type == 'pass':
  687. self.push_lit('281 Login Successful')
  688. self._logged_in = True
  689. else:
  690. raise Exception('Unknown cred type {}'.format(cred_type))
  691. class NNTPv2Handler(NNTPv1Handler):
  692. """A handler for RFC 3977 (NNTP "v2")"""
  693. def handle_CAPABILITIES(self):
  694. fmt = """\
  695. 101 Capability list:
  696. VERSION 2 3
  697. IMPLEMENTATION INN 2.5.1{}
  698. HDR
  699. LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
  700. OVER
  701. POST
  702. READER
  703. ."""
  704. if not self._logged_in:
  705. self.push_lit(fmt.format('\n AUTHINFO USER'))
  706. else:
  707. self.push_lit(fmt.format(''))
  708. def handle_MODE(self, _):
  709. raise Exception('MODE READER sent despite READER has been advertised')
  710. def handle_OVER(self, message_spec=None):
  711. return self.handle_XOVER(message_spec)
  712. class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
  713. """A handler that allows CAPABILITIES only after login"""
  714. def handle_CAPABILITIES(self):
  715. if not self._logged_in:
  716. self.push_lit('480 You must log in.')
  717. else:
  718. super().handle_CAPABILITIES()
  719. class ModeSwitchingNNTPv2Handler(NNTPv2Handler):
  720. """A server that starts in transit mode"""
  721. def __init__(self):
  722. self._switched = False
  723. def handle_CAPABILITIES(self):
  724. fmt = """\
  725. 101 Capability list:
  726. VERSION 2 3
  727. IMPLEMENTATION INN 2.5.1
  728. HDR
  729. LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
  730. OVER
  731. POST
  732. {}READER
  733. ."""
  734. if self._switched:
  735. self.push_lit(fmt.format(''))
  736. else:
  737. self.push_lit(fmt.format('MODE-'))
  738. def handle_MODE(self, what):
  739. assert not self._switched and what == 'reader'
  740. self._switched = True
  741. self.push_lit('200 Posting allowed')
  742. class NNTPv1v2TestsMixin:
  743. def setUp(self):
  744. super().setUp()
  745. def test_welcome(self):
  746. self.assertEqual(self.server.welcome, self.handler.welcome)
  747. def test_authinfo(self):
  748. if self.nntp_version == 2:
  749. self.assertIn('AUTHINFO', self.server._caps)
  750. self.server.login('testuser', 'testpw')
  751. # if AUTHINFO is gone from _caps we also know that getcapabilities()
  752. # has been called after login as it should
  753. self.assertNotIn('AUTHINFO', self.server._caps)
  754. def test_date(self):
  755. resp, date = self.server.date()
  756. self.assertEqual(resp, "111 20100914001155")
  757. self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
  758. def test_quit(self):
  759. self.assertFalse(self.sio.closed)
  760. resp = self.server.quit()
  761. self.assertEqual(resp, "205 Bye!")
  762. self.assertTrue(self.sio.closed)
  763. def test_help(self):
  764. resp, help = self.server.help()
  765. self.assertEqual(resp, "100 Legal commands")
  766. self.assertEqual(help, [
  767. ' authinfo user Name|pass Password|generic <prog> <args>',
  768. ' date',
  769. ' help',
  770. 'Report problems to <root@example.org>',
  771. ])
  772. def test_list(self):
  773. resp, groups = self.server.list()
  774. self.assertEqual(len(groups), 6)
  775. g = groups[1]
  776. self.assertEqual(g,
  777. GroupInfo("comp.lang.python.announce", "0000001153",
  778. "0000000993", "m"))
  779. resp, groups = self.server.list("*distutils*")
  780. self.assertEqual(len(groups), 2)
  781. g = groups[0]
  782. self.assertEqual(g,
  783. GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
  784. "0000000001", "m"))
  785. def test_stat(self):
  786. resp, art_num, message_id = self.server.stat(3000234)
  787. self.assertEqual(resp, "223 3000234 <45223423@example.com>")
  788. self.assertEqual(art_num, 3000234)
  789. self.assertEqual(message_id, "<45223423@example.com>")
  790. resp, art_num, message_id = self.server.stat("<45223423@example.com>")
  791. self.assertEqual(resp, "223 0 <45223423@example.com>")
  792. self.assertEqual(art_num, 0)
  793. self.assertEqual(message_id, "<45223423@example.com>")
  794. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  795. self.server.stat("<non.existent.id>")
  796. self.assertEqual(cm.exception.response, "430 No Such Article Found")
  797. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  798. self.server.stat()
  799. self.assertEqual(cm.exception.response, "412 No newsgroup selected")
  800. def test_next(self):
  801. resp, art_num, message_id = self.server.next()
  802. self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
  803. self.assertEqual(art_num, 3000237)
  804. self.assertEqual(message_id, "<668929@example.org>")
  805. def test_last(self):
  806. resp, art_num, message_id = self.server.last()
  807. self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
  808. self.assertEqual(art_num, 3000234)
  809. self.assertEqual(message_id, "<45223423@example.com>")
  810. def test_description(self):
  811. desc = self.server.description("comp.lang.python")
  812. self.assertEqual(desc, "The Python computer language.")
  813. desc = self.server.description("comp.lang.pythonx")
  814. self.assertEqual(desc, "")
  815. def test_descriptions(self):
  816. resp, groups = self.server.descriptions("comp.lang.python")
  817. self.assertEqual(resp, '215 Descriptions in form "group description".')
  818. self.assertEqual(groups, {
  819. "comp.lang.python": "The Python computer language.",
  820. })
  821. resp, groups = self.server.descriptions("comp.lang.python*")
  822. self.assertEqual(groups, {
  823. "comp.lang.python": "The Python computer language.",
  824. "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
  825. })
  826. resp, groups = self.server.descriptions("comp.lang.pythonx")
  827. self.assertEqual(groups, {})
  828. def test_group(self):
  829. resp, count, first, last, group = self.server.group("fr.comp.lang.python")
  830. self.assertTrue(resp.startswith("211 "), resp)
  831. self.assertEqual(first, 761)
  832. self.assertEqual(last, 1265)
  833. self.assertEqual(count, 486)
  834. self.assertEqual(group, "fr.comp.lang.python")
  835. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  836. self.server.group("comp.lang.python.devel")
  837. exc = cm.exception
  838. self.assertTrue(exc.response.startswith("411 No such group"),
  839. exc.response)
  840. def test_newnews(self):
  841. # NEWNEWS comp.lang.python [20]100913 082004
  842. dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
  843. resp, ids = self.server.newnews("comp.lang.python", dt)
  844. expected = (
  845. "230 list of newsarticles (NNTP v{0}) "
  846. "created after Mon Sep 13 08:20:04 2010 follows"
  847. ).format(self.nntp_version)
  848. self.assertEqual(resp, expected)
  849. self.assertEqual(ids, [
  850. "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
  851. "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
  852. ])
  853. # NEWNEWS fr.comp.lang.python [20]100913 082004
  854. dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
  855. resp, ids = self.server.newnews("fr.comp.lang.python", dt)
  856. self.assertEqual(resp, "230 An empty list of newsarticles follows")
  857. self.assertEqual(ids, [])
  858. def _check_article_body(self, lines):
  859. self.assertEqual(len(lines), 4)
  860. self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
  861. self.assertEqual(lines[-2], b"")
  862. self.assertEqual(lines[-3], b".Here is a dot-starting line.")
  863. self.assertEqual(lines[-4], b"This is just a test article.")
  864. def _check_article_head(self, lines):
  865. self.assertEqual(len(lines), 4)
  866. self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
  867. self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
  868. def _check_article_data(self, lines):
  869. self.assertEqual(len(lines), 9)
  870. self._check_article_head(lines[:4])
  871. self._check_article_body(lines[-4:])
  872. self.assertEqual(lines[4], b"")
  873. def test_article(self):
  874. # ARTICLE
  875. resp, info = self.server.article()
  876. self.assertEqual(resp, "220 3000237 <45223423@example.com>")
  877. art_num, message_id, lines = info
  878. self.assertEqual(art_num, 3000237)
  879. self.assertEqual(message_id, "<45223423@example.com>")
  880. self._check_article_data(lines)
  881. # ARTICLE num
  882. resp, info = self.server.article(3000234)
  883. self.assertEqual(resp, "220 3000234 <45223423@example.com>")
  884. art_num, message_id, lines = info
  885. self.assertEqual(art_num, 3000234)
  886. self.assertEqual(message_id, "<45223423@example.com>")
  887. self._check_article_data(lines)
  888. # ARTICLE id
  889. resp, info = self.server.article("<45223423@example.com>")
  890. self.assertEqual(resp, "220 0 <45223423@example.com>")
  891. art_num, message_id, lines = info
  892. self.assertEqual(art_num, 0)
  893. self.assertEqual(message_id, "<45223423@example.com>")
  894. self._check_article_data(lines)
  895. # Non-existent id
  896. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  897. self.server.article("<non-existent@example.com>")
  898. self.assertEqual(cm.exception.response, "430 No Such Article Found")
  899. def test_article_file(self):
  900. # With a "file" argument
  901. f = io.BytesIO()
  902. resp, info = self.server.article(file=f)
  903. self.assertEqual(resp, "220 3000237 <45223423@example.com>")
  904. art_num, message_id, lines = info
  905. self.assertEqual(art_num, 3000237)
  906. self.assertEqual(message_id, "<45223423@example.com>")
  907. self.assertEqual(lines, [])
  908. data = f.getvalue()
  909. self.assertTrue(data.startswith(
  910. b'From: "Demo User" <nobody@example.net>\r\n'
  911. b'Subject: I am just a test article\r\n'
  912. ), ascii(data))
  913. self.assertTrue(data.endswith(
  914. b'This is just a test article.\r\n'
  915. b'.Here is a dot-starting line.\r\n'
  916. b'\r\n'
  917. b'-- Signed by Andr\xc3\xa9.\r\n'
  918. ), ascii(data))
  919. def test_head(self):
  920. # HEAD
  921. resp, info = self.server.head()
  922. self.assertEqual(resp, "221 3000237 <45223423@example.com>")
  923. art_num, message_id, lines = info
  924. self.assertEqual(art_num, 3000237)
  925. self.assertEqual(message_id, "<45223423@example.com>")
  926. self._check_article_head(lines)
  927. # HEAD num
  928. resp, info = self.server.head(3000234)
  929. self.assertEqual(resp, "221 3000234 <45223423@example.com>")
  930. art_num, message_id, lines = info
  931. self.assertEqual(art_num, 3000234)
  932. self.assertEqual(message_id, "<45223423@example.com>")
  933. self._check_article_head(lines)
  934. # HEAD id
  935. resp, info = self.server.head("<45223423@example.com>")
  936. self.assertEqual(resp, "221 0 <45223423@example.com>")
  937. art_num, message_id, lines = info
  938. self.assertEqual(art_num, 0)
  939. self.assertEqual(message_id, "<45223423@example.com>")
  940. self._check_article_head(lines)
  941. # Non-existent id
  942. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  943. self.server.head("<non-existent@example.com>")
  944. self.assertEqual(cm.exception.response, "430 No Such Article Found")
  945. def test_head_file(self):
  946. f = io.BytesIO()
  947. resp, info = self.server.head(file=f)
  948. self.assertEqual(resp, "221 3000237 <45223423@example.com>")
  949. art_num, message_id, lines = info
  950. self.assertEqual(art_num, 3000237)
  951. self.assertEqual(message_id, "<45223423@example.com>")
  952. self.assertEqual(lines, [])
  953. data = f.getvalue()
  954. self.assertTrue(data.startswith(
  955. b'From: "Demo User" <nobody@example.net>\r\n'
  956. b'Subject: I am just a test article\r\n'
  957. ), ascii(data))
  958. self.assertFalse(data.endswith(
  959. b'This is just a test article.\r\n'
  960. b'.Here is a dot-starting line.\r\n'
  961. b'\r\n'
  962. b'-- Signed by Andr\xc3\xa9.\r\n'
  963. ), ascii(data))
  964. def test_body(self):
  965. # BODY
  966. resp, info = self.server.body()
  967. self.assertEqual(resp, "222 3000237 <45223423@example.com>")
  968. art_num, message_id, lines = info
  969. self.assertEqual(art_num, 3000237)
  970. self.assertEqual(message_id, "<45223423@example.com>")
  971. self._check_article_body(lines)
  972. # BODY num
  973. resp, info = self.server.body(3000234)
  974. self.assertEqual(resp, "222 3000234 <45223423@example.com>")
  975. art_num, message_id, lines = info
  976. self.assertEqual(art_num, 3000234)
  977. self.assertEqual(message_id, "<45223423@example.com>")
  978. self._check_article_body(lines)
  979. # BODY id
  980. resp, info = self.server.body("<45223423@example.com>")
  981. self.assertEqual(resp, "222 0 <45223423@example.com>")
  982. art_num, message_id, lines = info
  983. self.assertEqual(art_num, 0)
  984. self.assertEqual(message_id, "<45223423@example.com>")
  985. self._check_article_body(lines)
  986. # Non-existent id
  987. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  988. self.server.body("<non-existent@example.com>")
  989. self.assertEqual(cm.exception.response, "430 No Such Article Found")
  990. def test_body_file(self):
  991. f = io.BytesIO()
  992. resp, info = self.server.body(file=f)
  993. self.assertEqual(resp, "222 3000237 <45223423@example.com>")
  994. art_num, message_id, lines = info
  995. self.assertEqual(art_num, 3000237)
  996. self.assertEqual(message_id, "<45223423@example.com>")
  997. self.assertEqual(lines, [])
  998. data = f.getvalue()
  999. self.assertFalse(data.startswith(
  1000. b'From: "Demo User" <nobody@example.net>\r\n'
  1001. b'Subject: I am just a test article\r\n'
  1002. ), ascii(data))
  1003. self.assertTrue(data.endswith(
  1004. b'This is just a test article.\r\n'
  1005. b'.Here is a dot-starting line.\r\n'
  1006. b'\r\n'
  1007. b'-- Signed by Andr\xc3\xa9.\r\n'
  1008. ), ascii(data))
  1009. def check_over_xover_resp(self, resp, overviews):
  1010. self.assertTrue(resp.startswith("224 "), resp)
  1011. self.assertEqual(len(overviews), 3)
  1012. art_num, over = overviews[0]
  1013. self.assertEqual(art_num, 57)
  1014. self.assertEqual(over, {
  1015. "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
  1016. "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
  1017. "date": "Sat, 19 Jun 2010 18:04:08 -0400",
  1018. "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
  1019. "references": "<hvalf7$ort$1@dough.gmane.org>",
  1020. ":bytes": "7103",
  1021. ":lines": "16",
  1022. "xref": "news.gmane.io gmane.comp.python.authors:57"
  1023. })
  1024. art_num, over = overviews[1]
  1025. self.assertEqual(over["xref"], None)
  1026. art_num, over = overviews[2]
  1027. self.assertEqual(over["subject"],
  1028. "Re: Message d'erreur incompréhensible (par moi)")
  1029. def test_xover(self):
  1030. resp, overviews = self.server.xover(57, 59)
  1031. self.check_over_xover_resp(resp, overviews)
  1032. def test_over(self):
  1033. # In NNTP "v1", this will fallback on XOVER
  1034. resp, overviews = self.server.over((57, 59))
  1035. self.check_over_xover_resp(resp, overviews)
  1036. sample_post = (
  1037. b'From: "Demo User" <nobody@example.net>\r\n'
  1038. b'Subject: I am just a test article\r\n'
  1039. b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
  1040. b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
  1041. b'\r\n'
  1042. b'This is just a test article.\r\n'
  1043. b'.Here is a dot-starting line.\r\n'
  1044. b'\r\n'
  1045. b'-- Signed by Andr\xc3\xa9.\r\n'
  1046. )
  1047. def _check_posted_body(self):
  1048. # Check the raw body as received by the server
  1049. lines = self.handler.posted_body
  1050. # One additional line for the "." terminator
  1051. self.assertEqual(len(lines), 10)
  1052. self.assertEqual(lines[-1], b'.\r\n')
  1053. self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
  1054. self.assertEqual(lines[-3], b'\r\n')
  1055. self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
  1056. self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
  1057. def _check_post_ihave_sub(self, func, *args, file_factory):
  1058. # First the prepared post with CRLF endings
  1059. post = self.sample_post
  1060. func_args = args + (file_factory(post),)
  1061. self.handler.posted_body = None
  1062. resp = func(*func_args)
  1063. self._check_posted_body()
  1064. # Then the same post with "normal" line endings - they should be
  1065. # converted by NNTP.post and NNTP.ihave.
  1066. post = self.sample_post.replace(b"\r\n", b"\n")
  1067. func_args = args + (file_factory(post),)
  1068. self.handler.posted_body = None
  1069. resp = func(*func_args)
  1070. self._check_posted_body()
  1071. return resp
  1072. def check_post_ihave(self, func, success_resp, *args):
  1073. # With a bytes object
  1074. resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
  1075. self.assertEqual(resp, success_resp)
  1076. # With a bytearray object
  1077. resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
  1078. self.assertEqual(resp, success_resp)
  1079. # With a file object
  1080. resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
  1081. self.assertEqual(resp, success_resp)
  1082. # With an iterable of terminated lines
  1083. def iterlines(b):
  1084. return iter(b.splitlines(keepends=True))
  1085. resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
  1086. self.assertEqual(resp, success_resp)
  1087. # With an iterable of non-terminated lines
  1088. def iterlines(b):
  1089. return iter(b.splitlines(keepends=False))
  1090. resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
  1091. self.assertEqual(resp, success_resp)
  1092. def test_post(self):
  1093. self.check_post_ihave(self.server.post, "240 Article received OK")
  1094. self.handler.allow_posting = False
  1095. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  1096. self.server.post(self.sample_post)
  1097. self.assertEqual(cm.exception.response,
  1098. "440 Posting not permitted")
  1099. def test_ihave(self):
  1100. self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
  1101. "<i.am.an.article.you.will.want@example.com>")
  1102. with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
  1103. self.server.ihave("<another.message.id>", self.sample_post)
  1104. self.assertEqual(cm.exception.response,
  1105. "435 Article not wanted")
  1106. def test_too_long_lines(self):
  1107. dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
  1108. self.assertRaises(nntplib.NNTPDataError,
  1109. self.server.newnews, "comp.lang.python", dt)
  1110. class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
  1111. """Tests an NNTP v1 server (no capabilities)."""
  1112. nntp_version = 1
  1113. handler_class = NNTPv1Handler
  1114. def test_caps(self):
  1115. caps = self.server.getcapabilities()
  1116. self.assertEqual(caps, {})
  1117. self.assertEqual(self.server.nntp_version, 1)
  1118. self.assertEqual(self.server.nntp_implementation, None)
  1119. class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
  1120. """Tests an NNTP v2 server (with capabilities)."""
  1121. nntp_version = 2
  1122. handler_class = NNTPv2Handler
  1123. def test_caps(self):
  1124. caps = self.server.getcapabilities()
  1125. self.assertEqual(caps, {
  1126. 'VERSION': ['2', '3'],
  1127. 'IMPLEMENTATION': ['INN', '2.5.1'],
  1128. 'AUTHINFO': ['USER'],
  1129. 'HDR': [],
  1130. 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
  1131. 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
  1132. 'OVER': [],
  1133. 'POST': [],
  1134. 'READER': [],
  1135. })
  1136. self.assertEqual(self.server.nntp_version, 3)
  1137. self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
  1138. class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
  1139. """Tests a probably NNTP v2 server with capabilities only after login."""
  1140. nntp_version = 2
  1141. handler_class = CapsAfterLoginNNTPv2Handler
  1142. def test_caps_only_after_login(self):
  1143. self.assertEqual(self.server._caps, {})
  1144. self.server.login('testuser', 'testpw')
  1145. self.assertIn('VERSION', self.server._caps)
  1146. class SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
  1147. unittest.TestCase):
  1148. """Same tests as for v2 but we tell NTTP to send MODE READER to a server
  1149. that isn't in READER mode by default."""
  1150. nntp_version = 2
  1151. handler_class = ModeSwitchingNNTPv2Handler
  1152. def test_we_are_in_reader_mode_after_connect(self):
  1153. self.assertIn('READER', self.server._caps)
  1154. class MiscTests(unittest.TestCase):
  1155. def test_decode_header(self):
  1156. def gives(a, b):
  1157. self.assertEqual(nntplib.decode_header(a), b)
  1158. gives("" , "")
  1159. gives("a plain header", "a plain header")
  1160. gives(" with extra spaces ", " with extra spaces ")
  1161. gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
  1162. gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
  1163. " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
  1164. "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
  1165. gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
  1166. "Re: problème de matrice")
  1167. # A natively utf-8 header (found in the real world!)
  1168. gives("Re: Message d'erreur incompréhensible (par moi)",
  1169. "Re: Message d'erreur incompréhensible (par moi)")
  1170. def test_parse_overview_fmt(self):
  1171. # The minimal (default) response
  1172. lines = ["Subject:", "From:", "Date:", "Message-ID:",
  1173. "References:", ":bytes", ":lines"]
  1174. self.assertEqual(nntplib._parse_overview_fmt(lines),
  1175. ["subject", "from", "date", "message-id", "references",
  1176. ":bytes", ":lines"])
  1177. # The minimal response using alternative names
  1178. lines = ["Subject:", "From:", "Date:", "Message-ID:",
  1179. "References:", "Bytes:", "Lines:"]
  1180. self.assertEqual(nntplib._parse_overview_fmt(lines),
  1181. ["subject", "from", "date", "message-id", "references",
  1182. ":bytes", ":lines"])
  1183. # Variations in casing
  1184. lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
  1185. "References:", "BYTES:", "Lines:"]
  1186. self.assertEqual(nntplib._parse_overview_fmt(lines),
  1187. ["subject", "from", "date", "message-id", "references",
  1188. ":bytes", ":lines"])
  1189. # First example from RFC 3977
  1190. lines = ["Subject:", "From:", "Date:", "Message-ID:",
  1191. "References:", ":bytes", ":lines", "Xref:full",
  1192. "Distribution:full"]
  1193. self.assertEqual(nntplib._parse_overview_fmt(lines),
  1194. ["subject", "from", "date", "message-id", "references",
  1195. ":bytes", ":lines", "xref", "distribution"])
  1196. # Second example from RFC 3977
  1197. lines = ["Subject:", "From:", "Date:", "Message-ID:",
  1198. "References:", "Bytes:", "Lines:", "Xref:FULL",
  1199. "Distribution:FULL"]
  1200. self.assertEqual(nntplib._parse_overview_fmt(lines),
  1201. ["subject", "from", "date", "message-id", "references",
  1202. ":bytes", ":lines", "xref", "distribution"])
  1203. # A classic response from INN
  1204. lines = ["Subject:", "From:", "Date:", "Message-ID:",
  1205. "References:", "Bytes:", "Lines:", "Xref:full"]
  1206. self.assertEqual(nntplib._parse_overview_fmt(lines),
  1207. ["subject", "from", "date", "message-id", "references",
  1208. ":bytes", ":lines", "xref"])
  1209. def test_parse_overview(self):
  1210. fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
  1211. # First example from RFC 3977
  1212. lines = [
  1213. '3000234\tI am just a test article\t"Demo User" '
  1214. '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
  1215. '<45223423@example.com>\t<45454@example.net>\t1234\t'
  1216. '17\tXref: news.example.com misc.test:3000363',
  1217. ]
  1218. overview = nntplib._parse_overview(lines, fmt)
  1219. (art_num, fields), = overview
  1220. self.assertEqual(art_num, 3000234)
  1221. self.assertEqual(fields, {
  1222. 'subject': 'I am just a test article',
  1223. 'from': '"Demo User" <nobody@example.com>',
  1224. 'date': '6 Oct 1998 04:38:40 -0500',
  1225. 'message-id': '<45223423@example.com>',
  1226. 'references': '<45454@example.net>',
  1227. ':bytes': '1234',
  1228. ':lines': '17',
  1229. 'xref': 'news.example.com misc.test:3000363',
  1230. })
  1231. # Second example; here the "Xref" field is totally absent (including
  1232. # the header name) and comes out as None
  1233. lines = [
  1234. '3000234\tI am just a test article\t"Demo User" '
  1235. '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
  1236. '<45223423@example.com>\t<45454@example.net>\t1234\t'
  1237. '17\t\t',
  1238. ]
  1239. overview = nntplib._parse_overview(lines, fmt)
  1240. (art_num, fields), = overview
  1241. self.assertEqual(fields['xref'], None)
  1242. # Third example; the "Xref" is an empty string, while "references"
  1243. # is a single space.
  1244. lines = [
  1245. '3000234\tI am just a test article\t"Demo User" '
  1246. '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
  1247. '<45223423@example.com>\t \t1234\t'
  1248. '17\tXref: \t',
  1249. ]
  1250. overview = nntplib._parse_overview(lines, fmt)
  1251. (art_num, fields), = overview
  1252. self.assertEqual(fields['references'], ' ')
  1253. self.assertEqual(fields['xref'], '')
  1254. def test_parse_datetime(self):
  1255. def gives(a, b, *c):
  1256. self.assertEqual(nntplib._parse_datetime(a, b),
  1257. datetime.datetime(*c))
  1258. # Output of DATE command
  1259. gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
  1260. # Variations
  1261. gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
  1262. gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
  1263. gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
  1264. def test_unparse_datetime(self):
  1265. # Test non-legacy mode
  1266. # 1) with a datetime
  1267. def gives(y, M, d, h, m, s, date_str, time_str):
  1268. dt = datetime.datetime(y, M, d, h, m, s)
  1269. self.assertEqual(nntplib._unparse_datetime(dt),
  1270. (date_str, time_str))
  1271. self.assertEqual(nntplib._unparse_datetime(dt, False),
  1272. (date_str, time_str))
  1273. gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
  1274. gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
  1275. gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
  1276. # 2) with a date
  1277. def gives(y, M, d, date_str, time_str):
  1278. dt = datetime.date(y, M, d)
  1279. self.assertEqual(nntplib._unparse_datetime(dt),
  1280. (date_str, time_str))
  1281. self.assertEqual(nntplib._unparse_datetime(dt, False),
  1282. (date_str, time_str))
  1283. gives(1999, 6, 23, "19990623", "000000")
  1284. gives(2000, 6, 23, "20000623", "000000")
  1285. gives(2010, 6, 5, "20100605", "000000")
  1286. def test_unparse_datetime_legacy(self):
  1287. # Test legacy mode (RFC 977)
  1288. # 1) with a datetime
  1289. def gives(y, M, d, h, m, s, date_str, time_str):
  1290. dt = datetime.datetime(y, M, d, h, m, s)
  1291. self.assertEqual(nntplib._unparse_datetime(dt, True),
  1292. (date_str, time_str))
  1293. gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
  1294. gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
  1295. gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
  1296. # 2) with a date
  1297. def gives(y, M, d, date_str, time_str):
  1298. dt = datetime.date(y, M, d)
  1299. self.assertEqual(nntplib._unparse_datetime(dt, True),
  1300. (date_str, time_str))
  1301. gives(1999, 6, 23, "990623", "000000")
  1302. gives(2000, 6, 23, "000623", "000000")
  1303. gives(2010, 6, 5, "100605", "000000")
  1304. @unittest.skipUnless(ssl, 'requires SSL support')
  1305. def test_ssl_support(self):
  1306. self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
  1307. class PublicAPITests(unittest.TestCase):
  1308. """Ensures that the correct values are exposed in the public API."""
  1309. def test_module_all_attribute(self):
  1310. self.assertTrue(hasattr(nntplib, '__all__'))
  1311. target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
  1312. 'NNTPTemporaryError', 'NNTPPermanentError',
  1313. 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
  1314. if ssl is not None:
  1315. target_api.append('NNTP_SSL')
  1316. self.assertEqual(set(nntplib.__all__), set(target_api))
  1317. class MockSocketTests(unittest.TestCase):
  1318. """Tests involving a mock socket object
  1319. Used where the _NNTPServerIO file object is not enough."""
  1320. nntp_class = nntplib.NNTP
  1321. def check_constructor_error_conditions(
  1322. self, handler_class,
  1323. expected_error_type, expected_error_msg,
  1324. login=None, password=None):
  1325. class mock_socket_module:
  1326. def create_connection(address, timeout):
  1327. return MockSocket()
  1328. class MockSocket:
  1329. def close(self):
  1330. nonlocal socket_closed
  1331. socket_closed = True
  1332. def makefile(socket, mode):
  1333. handler = handler_class()
  1334. _, file = make_mock_file(handler)
  1335. files.append(file)
  1336. return file
  1337. socket_closed = False
  1338. files = []
  1339. with patch('nntplib.socket', mock_socket_module), \
  1340. self.assertRaisesRegex(expected_error_type, expected_error_msg):
  1341. self.nntp_class('dummy', user=login, password=password)
  1342. self.assertTrue(socket_closed)
  1343. for f in files:
  1344. self.assertTrue(f.closed)
  1345. def test_bad_welcome(self):
  1346. #Test a bad welcome message
  1347. class Handler(NNTPv1Handler):
  1348. welcome = 'Bad Welcome'
  1349. self.check_constructor_error_conditions(
  1350. Handler, nntplib.NNTPProtocolError, Handler.welcome)
  1351. def test_service_temporarily_unavailable(self):
  1352. #Test service temporarily unavailable
  1353. class Handler(NNTPv1Handler):
  1354. welcome = '400 Service temporarily unavailable'
  1355. self.check_constructor_error_conditions(
  1356. Handler, nntplib.NNTPTemporaryError, Handler.welcome)
  1357. def test_service_permanently_unavailable(self):
  1358. #Test service permanently unavailable
  1359. class Handler(NNTPv1Handler):
  1360. welcome = '502 Service permanently unavailable'
  1361. self.check_constructor_error_conditions(
  1362. Handler, nntplib.NNTPPermanentError, Handler.welcome)
  1363. def test_bad_capabilities(self):
  1364. #Test a bad capabilities response
  1365. class Handler(NNTPv1Handler):
  1366. def handle_CAPABILITIES(self):
  1367. self.push_lit(capabilities_response)
  1368. capabilities_response = '201 bad capability'
  1369. self.check_constructor_error_conditions(
  1370. Handler, nntplib.NNTPReplyError, capabilities_response)
  1371. def test_login_aborted(self):
  1372. #Test a bad authinfo response
  1373. login = 't@e.com'
  1374. password = 'python'
  1375. class Handler(NNTPv1Handler):
  1376. def handle_AUTHINFO(self, *args):
  1377. self.push_lit(authinfo_response)
  1378. authinfo_response = '503 Mechanism not recognized'
  1379. self.check_constructor_error_conditions(
  1380. Handler, nntplib.NNTPPermanentError, authinfo_response,
  1381. login, password)
  1382. class bypass_context:
  1383. """Bypass encryption and actual SSL module"""
  1384. def wrap_socket(sock, **args):
  1385. return sock
  1386. @unittest.skipUnless(ssl, 'requires SSL support')
  1387. class MockSslTests(MockSocketTests):
  1388. @staticmethod
  1389. def nntp_class(*pos, **kw):
  1390. return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
  1391. class LocalServerTests(unittest.TestCase):
  1392. def setUp(self):
  1393. sock = socket.socket()
  1394. port = socket_helper.bind_port(sock)
  1395. sock.listen()
  1396. self.background = threading.Thread(
  1397. target=self.run_server, args=(sock,))
  1398. self.background.start()
  1399. self.addCleanup(self.background.join)
  1400. self.nntp = self.enterContext(NNTP(socket_helper.HOST, port, usenetrc=False))
  1401. def run_server(self, sock):
  1402. # Could be generalized to handle more commands in separate methods
  1403. with sock:
  1404. [client, _] = sock.accept()
  1405. with contextlib.ExitStack() as cleanup:
  1406. cleanup.enter_context(client)
  1407. reader = cleanup.enter_context(client.makefile('rb'))
  1408. client.sendall(b'200 Server ready\r\n')
  1409. while True:
  1410. cmd = reader.readline()
  1411. if cmd == b'CAPABILITIES\r\n':
  1412. client.sendall(
  1413. b'101 Capability list:\r\n'
  1414. b'VERSION 2\r\n'
  1415. b'STARTTLS\r\n'
  1416. b'.\r\n'
  1417. )
  1418. elif cmd == b'STARTTLS\r\n':
  1419. reader.close()
  1420. client.sendall(b'382 Begin TLS negotiation now\r\n')
  1421. context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  1422. context.load_cert_chain(certfile)
  1423. client = context.wrap_socket(
  1424. client, server_side=True)
  1425. cleanup.enter_context(client)
  1426. reader = cleanup.enter_context(client.makefile('rb'))
  1427. elif cmd == b'QUIT\r\n':
  1428. client.sendall(b'205 Bye!\r\n')
  1429. break
  1430. else:
  1431. raise ValueError('Unexpected command {!r}'.format(cmd))
  1432. @unittest.skipUnless(ssl, 'requires SSL support')
  1433. def test_starttls(self):
  1434. file = self.nntp.file
  1435. sock = self.nntp.sock
  1436. self.nntp.starttls()
  1437. # Check that the socket and internal pseudo-file really were
  1438. # changed.
  1439. self.assertNotEqual(file, self.nntp.file)
  1440. self.assertNotEqual(sock, self.nntp.sock)
  1441. # Check that the new socket really is an SSL one
  1442. self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
  1443. # Check that trying starttls when it's already active fails.
  1444. self.assertRaises(ValueError, self.nntp.starttls)
  1445. if __name__ == "__main__":
  1446. unittest.main()