test_mailbox.py 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313
  1. import os
  2. import sys
  3. import time
  4. import stat
  5. import socket
  6. import email
  7. import email.message
  8. import re
  9. import io
  10. import tempfile
  11. from test import support
  12. from test.support import os_helper
  13. from test.support import socket_helper
  14. import unittest
  15. import textwrap
  16. import mailbox
  17. import glob
  18. if not socket_helper.has_gethostname:
  19. raise unittest.SkipTest("test requires gethostname()")
  20. class TestBase:
  21. all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
  22. mailbox.mboxMessage, mailbox.MHMessage,
  23. mailbox.BabylMessage, mailbox.MMDFMessage)
  24. def _check_sample(self, msg):
  25. # Inspect a mailbox.Message representation of the sample message
  26. self.assertIsInstance(msg, email.message.Message)
  27. self.assertIsInstance(msg, mailbox.Message)
  28. for key, value in _sample_headers.items():
  29. self.assertIn(value, msg.get_all(key))
  30. self.assertTrue(msg.is_multipart())
  31. self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
  32. for i, payload in enumerate(_sample_payloads):
  33. part = msg.get_payload(i)
  34. self.assertIsInstance(part, email.message.Message)
  35. self.assertNotIsInstance(part, mailbox.Message)
  36. self.assertEqual(part.get_payload(), payload)
  37. def _delete_recursively(self, target):
  38. # Delete a file or delete a directory recursively
  39. if os.path.isdir(target):
  40. os_helper.rmtree(target)
  41. elif os.path.exists(target):
  42. os_helper.unlink(target)
  43. class TestMailbox(TestBase):
  44. maxDiff = None
  45. _factory = None # Overridden by subclasses to reuse tests
  46. _template = 'From: foo\n\n%s\n'
  47. def setUp(self):
  48. self._path = os_helper.TESTFN
  49. self._delete_recursively(self._path)
  50. self._box = self._factory(self._path)
  51. def tearDown(self):
  52. self._box.close()
  53. self._delete_recursively(self._path)
  54. def test_add(self):
  55. # Add copies of a sample message
  56. keys = []
  57. keys.append(self._box.add(self._template % 0))
  58. self.assertEqual(len(self._box), 1)
  59. keys.append(self._box.add(mailbox.Message(_sample_message)))
  60. self.assertEqual(len(self._box), 2)
  61. keys.append(self._box.add(email.message_from_string(_sample_message)))
  62. self.assertEqual(len(self._box), 3)
  63. keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
  64. self.assertEqual(len(self._box), 4)
  65. keys.append(self._box.add(_sample_message))
  66. self.assertEqual(len(self._box), 5)
  67. keys.append(self._box.add(_bytes_sample_message))
  68. self.assertEqual(len(self._box), 6)
  69. with self.assertWarns(DeprecationWarning):
  70. keys.append(self._box.add(
  71. io.TextIOWrapper(io.BytesIO(_bytes_sample_message), encoding="utf-8")))
  72. self.assertEqual(len(self._box), 7)
  73. self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
  74. for i in (1, 2, 3, 4, 5, 6):
  75. self._check_sample(self._box[keys[i]])
  76. _nonascii_msg = textwrap.dedent("""\
  77. From: foo
  78. Subject: Falinaptár házhozszállítással. Már rendeltél?
  79. 0
  80. """)
  81. def test_add_invalid_8bit_bytes_header(self):
  82. key = self._box.add(self._nonascii_msg.encode('latin-1'))
  83. self.assertEqual(len(self._box), 1)
  84. self.assertEqual(self._box.get_bytes(key),
  85. self._nonascii_msg.encode('latin-1'))
  86. def test_invalid_nonascii_header_as_string(self):
  87. subj = self._nonascii_msg.splitlines()[1]
  88. key = self._box.add(subj.encode('latin-1'))
  89. self.assertEqual(self._box.get_string(key),
  90. 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
  91. 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
  92. def test_add_nonascii_string_header_raises(self):
  93. with self.assertRaisesRegex(ValueError, "ASCII-only"):
  94. self._box.add(self._nonascii_msg)
  95. self._box.flush()
  96. self.assertEqual(len(self._box), 0)
  97. self.assertMailboxEmpty()
  98. def test_add_that_raises_leaves_mailbox_empty(self):
  99. def raiser(*args, **kw):
  100. raise Exception("a fake error")
  101. support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
  102. with self.assertRaises(Exception):
  103. self._box.add(email.message_from_string("From: Alphöso"))
  104. self.assertEqual(len(self._box), 0)
  105. self._box.close()
  106. self.assertMailboxEmpty()
  107. _non_latin_bin_msg = textwrap.dedent("""\
  108. From: foo@bar.com
  109. To: báz
  110. Subject: Maintenant je vous présente mon collègue, le pouf célèbre
  111. \tJean de Baddie
  112. Mime-Version: 1.0
  113. Content-Type: text/plain; charset="utf-8"
  114. Content-Transfer-Encoding: 8bit
  115. Да, они летят.
  116. """).encode('utf-8')
  117. def test_add_8bit_body(self):
  118. key = self._box.add(self._non_latin_bin_msg)
  119. self.assertEqual(self._box.get_bytes(key),
  120. self._non_latin_bin_msg)
  121. with self._box.get_file(key) as f:
  122. self.assertEqual(f.read(),
  123. self._non_latin_bin_msg.replace(b'\n',
  124. os.linesep.encode()))
  125. self.assertEqual(self._box[key].get_payload(),
  126. "Да, они летят.\n")
  127. def test_add_binary_file(self):
  128. with tempfile.TemporaryFile('wb+') as f:
  129. f.write(_bytes_sample_message)
  130. f.seek(0)
  131. key = self._box.add(f)
  132. self.assertEqual(self._box.get_bytes(key).split(b'\n'),
  133. _bytes_sample_message.split(b'\n'))
  134. def test_add_binary_nonascii_file(self):
  135. with tempfile.TemporaryFile('wb+') as f:
  136. f.write(self._non_latin_bin_msg)
  137. f.seek(0)
  138. key = self._box.add(f)
  139. self.assertEqual(self._box.get_bytes(key).split(b'\n'),
  140. self._non_latin_bin_msg.split(b'\n'))
  141. def test_add_text_file_warns(self):
  142. with tempfile.TemporaryFile('w+', encoding='utf-8') as f:
  143. f.write(_sample_message)
  144. f.seek(0)
  145. with self.assertWarns(DeprecationWarning):
  146. key = self._box.add(f)
  147. self.assertEqual(self._box.get_bytes(key).split(b'\n'),
  148. _bytes_sample_message.split(b'\n'))
  149. def test_add_StringIO_warns(self):
  150. with self.assertWarns(DeprecationWarning):
  151. key = self._box.add(io.StringIO(self._template % "0"))
  152. self.assertEqual(self._box.get_string(key), self._template % "0")
  153. def test_add_nonascii_StringIO_raises(self):
  154. with self.assertWarns(DeprecationWarning):
  155. with self.assertRaisesRegex(ValueError, "ASCII-only"):
  156. self._box.add(io.StringIO(self._nonascii_msg))
  157. self.assertEqual(len(self._box), 0)
  158. self._box.close()
  159. self.assertMailboxEmpty()
  160. def test_remove(self):
  161. # Remove messages using remove()
  162. self._test_remove_or_delitem(self._box.remove)
  163. def test_delitem(self):
  164. # Remove messages using __delitem__()
  165. self._test_remove_or_delitem(self._box.__delitem__)
  166. def _test_remove_or_delitem(self, method):
  167. # (Used by test_remove() and test_delitem().)
  168. key0 = self._box.add(self._template % 0)
  169. key1 = self._box.add(self._template % 1)
  170. self.assertEqual(len(self._box), 2)
  171. method(key0)
  172. self.assertEqual(len(self._box), 1)
  173. self.assertRaises(KeyError, lambda: self._box[key0])
  174. self.assertRaises(KeyError, lambda: method(key0))
  175. self.assertEqual(self._box.get_string(key1), self._template % 1)
  176. key2 = self._box.add(self._template % 2)
  177. self.assertEqual(len(self._box), 2)
  178. method(key2)
  179. self.assertEqual(len(self._box), 1)
  180. self.assertRaises(KeyError, lambda: self._box[key2])
  181. self.assertRaises(KeyError, lambda: method(key2))
  182. self.assertEqual(self._box.get_string(key1), self._template % 1)
  183. method(key1)
  184. self.assertEqual(len(self._box), 0)
  185. self.assertRaises(KeyError, lambda: self._box[key1])
  186. self.assertRaises(KeyError, lambda: method(key1))
  187. def test_discard(self, repetitions=10):
  188. # Discard messages
  189. key0 = self._box.add(self._template % 0)
  190. key1 = self._box.add(self._template % 1)
  191. self.assertEqual(len(self._box), 2)
  192. self._box.discard(key0)
  193. self.assertEqual(len(self._box), 1)
  194. self.assertRaises(KeyError, lambda: self._box[key0])
  195. self._box.discard(key0)
  196. self.assertEqual(len(self._box), 1)
  197. self.assertRaises(KeyError, lambda: self._box[key0])
  198. def test_get(self):
  199. # Retrieve messages using get()
  200. key0 = self._box.add(self._template % 0)
  201. msg = self._box.get(key0)
  202. self.assertEqual(msg['from'], 'foo')
  203. self.assertEqual(msg.get_payload(), '0\n')
  204. self.assertIsNone(self._box.get('foo'))
  205. self.assertIs(self._box.get('foo', False), False)
  206. self._box.close()
  207. self._box = self._factory(self._path)
  208. key1 = self._box.add(self._template % 1)
  209. msg = self._box.get(key1)
  210. self.assertEqual(msg['from'], 'foo')
  211. self.assertEqual(msg.get_payload(), '1\n')
  212. def test_getitem(self):
  213. # Retrieve message using __getitem__()
  214. key0 = self._box.add(self._template % 0)
  215. msg = self._box[key0]
  216. self.assertEqual(msg['from'], 'foo')
  217. self.assertEqual(msg.get_payload(), '0\n')
  218. self.assertRaises(KeyError, lambda: self._box['foo'])
  219. self._box.discard(key0)
  220. self.assertRaises(KeyError, lambda: self._box[key0])
  221. def test_get_message(self):
  222. # Get Message representations of messages
  223. key0 = self._box.add(self._template % 0)
  224. key1 = self._box.add(_sample_message)
  225. msg0 = self._box.get_message(key0)
  226. self.assertIsInstance(msg0, mailbox.Message)
  227. self.assertEqual(msg0['from'], 'foo')
  228. self.assertEqual(msg0.get_payload(), '0\n')
  229. self._check_sample(self._box.get_message(key1))
  230. def test_get_bytes(self):
  231. # Get bytes representations of messages
  232. key0 = self._box.add(self._template % 0)
  233. key1 = self._box.add(_sample_message)
  234. self.assertEqual(self._box.get_bytes(key0),
  235. (self._template % 0).encode('ascii'))
  236. self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
  237. def test_get_string(self):
  238. # Get string representations of messages
  239. key0 = self._box.add(self._template % 0)
  240. key1 = self._box.add(_sample_message)
  241. self.assertEqual(self._box.get_string(key0), self._template % 0)
  242. self.assertEqual(self._box.get_string(key1).split('\n'),
  243. _sample_message.split('\n'))
  244. def test_get_file(self):
  245. # Get file representations of messages
  246. key0 = self._box.add(self._template % 0)
  247. key1 = self._box.add(_sample_message)
  248. with self._box.get_file(key0) as file:
  249. data0 = file.read()
  250. with self._box.get_file(key1) as file:
  251. data1 = file.read()
  252. self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
  253. self._template % 0)
  254. self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
  255. _sample_message)
  256. def test_get_file_can_be_closed_twice(self):
  257. # Issue 11700
  258. key = self._box.add(_sample_message)
  259. f = self._box.get_file(key)
  260. f.close()
  261. f.close()
  262. def test_iterkeys(self):
  263. # Get keys using iterkeys()
  264. self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
  265. def test_keys(self):
  266. # Get keys using keys()
  267. self._check_iteration(self._box.keys, do_keys=True, do_values=False)
  268. def test_itervalues(self):
  269. # Get values using itervalues()
  270. self._check_iteration(self._box.itervalues, do_keys=False,
  271. do_values=True)
  272. def test_iter(self):
  273. # Get values using __iter__()
  274. self._check_iteration(self._box.__iter__, do_keys=False,
  275. do_values=True)
  276. def test_values(self):
  277. # Get values using values()
  278. self._check_iteration(self._box.values, do_keys=False, do_values=True)
  279. def test_iteritems(self):
  280. # Get keys and values using iteritems()
  281. self._check_iteration(self._box.iteritems, do_keys=True,
  282. do_values=True)
  283. def test_items(self):
  284. # Get keys and values using items()
  285. self._check_iteration(self._box.items, do_keys=True, do_values=True)
  286. def _check_iteration(self, method, do_keys, do_values, repetitions=10):
  287. for value in method():
  288. self.fail("Not empty")
  289. keys, values = [], []
  290. for i in range(repetitions):
  291. keys.append(self._box.add(self._template % i))
  292. values.append(self._template % i)
  293. if do_keys and not do_values:
  294. returned_keys = list(method())
  295. elif do_values and not do_keys:
  296. returned_values = list(method())
  297. else:
  298. returned_keys, returned_values = [], []
  299. for key, value in method():
  300. returned_keys.append(key)
  301. returned_values.append(value)
  302. if do_keys:
  303. self.assertEqual(len(keys), len(returned_keys))
  304. self.assertEqual(set(keys), set(returned_keys))
  305. if do_values:
  306. count = 0
  307. for value in returned_values:
  308. self.assertEqual(value['from'], 'foo')
  309. self.assertLess(int(value.get_payload()), repetitions)
  310. count += 1
  311. self.assertEqual(len(values), count)
  312. def test_contains(self):
  313. # Check existence of keys using __contains__()
  314. self.assertNotIn('foo', self._box)
  315. key0 = self._box.add(self._template % 0)
  316. self.assertIn(key0, self._box)
  317. self.assertNotIn('foo', self._box)
  318. key1 = self._box.add(self._template % 1)
  319. self.assertIn(key1, self._box)
  320. self.assertIn(key0, self._box)
  321. self.assertNotIn('foo', self._box)
  322. self._box.remove(key0)
  323. self.assertNotIn(key0, self._box)
  324. self.assertIn(key1, self._box)
  325. self.assertNotIn('foo', self._box)
  326. self._box.remove(key1)
  327. self.assertNotIn(key1, self._box)
  328. self.assertNotIn(key0, self._box)
  329. self.assertNotIn('foo', self._box)
  330. def test_len(self, repetitions=10):
  331. # Get message count
  332. keys = []
  333. for i in range(repetitions):
  334. self.assertEqual(len(self._box), i)
  335. keys.append(self._box.add(self._template % i))
  336. self.assertEqual(len(self._box), i + 1)
  337. for i in range(repetitions):
  338. self.assertEqual(len(self._box), repetitions - i)
  339. self._box.remove(keys[i])
  340. self.assertEqual(len(self._box), repetitions - i - 1)
  341. def test_set_item(self):
  342. # Modify messages using __setitem__()
  343. key0 = self._box.add(self._template % 'original 0')
  344. self.assertEqual(self._box.get_string(key0),
  345. self._template % 'original 0')
  346. key1 = self._box.add(self._template % 'original 1')
  347. self.assertEqual(self._box.get_string(key1),
  348. self._template % 'original 1')
  349. self._box[key0] = self._template % 'changed 0'
  350. self.assertEqual(self._box.get_string(key0),
  351. self._template % 'changed 0')
  352. self._box[key1] = self._template % 'changed 1'
  353. self.assertEqual(self._box.get_string(key1),
  354. self._template % 'changed 1')
  355. self._box[key0] = _sample_message
  356. self._check_sample(self._box[key0])
  357. self._box[key1] = self._box[key0]
  358. self._check_sample(self._box[key1])
  359. self._box[key0] = self._template % 'original 0'
  360. self.assertEqual(self._box.get_string(key0),
  361. self._template % 'original 0')
  362. self._check_sample(self._box[key1])
  363. self.assertRaises(KeyError,
  364. lambda: self._box.__setitem__('foo', 'bar'))
  365. self.assertRaises(KeyError, lambda: self._box['foo'])
  366. self.assertEqual(len(self._box), 2)
  367. def test_clear(self, iterations=10):
  368. # Remove all messages using clear()
  369. keys = []
  370. for i in range(iterations):
  371. self._box.add(self._template % i)
  372. for i, key in enumerate(keys):
  373. self.assertEqual(self._box.get_string(key), self._template % i)
  374. self._box.clear()
  375. self.assertEqual(len(self._box), 0)
  376. for i, key in enumerate(keys):
  377. self.assertRaises(KeyError, lambda: self._box.get_string(key))
  378. def test_pop(self):
  379. # Get and remove a message using pop()
  380. key0 = self._box.add(self._template % 0)
  381. self.assertIn(key0, self._box)
  382. key1 = self._box.add(self._template % 1)
  383. self.assertIn(key1, self._box)
  384. self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
  385. self.assertNotIn(key0, self._box)
  386. self.assertIn(key1, self._box)
  387. key2 = self._box.add(self._template % 2)
  388. self.assertIn(key2, self._box)
  389. self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
  390. self.assertNotIn(key2, self._box)
  391. self.assertIn(key1, self._box)
  392. self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
  393. self.assertNotIn(key1, self._box)
  394. self.assertEqual(len(self._box), 0)
  395. def test_popitem(self, iterations=10):
  396. # Get and remove an arbitrary (key, message) using popitem()
  397. keys = []
  398. for i in range(10):
  399. keys.append(self._box.add(self._template % i))
  400. seen = []
  401. for i in range(10):
  402. key, msg = self._box.popitem()
  403. self.assertIn(key, keys)
  404. self.assertNotIn(key, seen)
  405. seen.append(key)
  406. self.assertEqual(int(msg.get_payload()), keys.index(key))
  407. self.assertEqual(len(self._box), 0)
  408. for key in keys:
  409. self.assertRaises(KeyError, lambda: self._box[key])
  410. def test_update(self):
  411. # Modify multiple messages using update()
  412. key0 = self._box.add(self._template % 'original 0')
  413. key1 = self._box.add(self._template % 'original 1')
  414. key2 = self._box.add(self._template % 'original 2')
  415. self._box.update({key0: self._template % 'changed 0',
  416. key2: _sample_message})
  417. self.assertEqual(len(self._box), 3)
  418. self.assertEqual(self._box.get_string(key0),
  419. self._template % 'changed 0')
  420. self.assertEqual(self._box.get_string(key1),
  421. self._template % 'original 1')
  422. self._check_sample(self._box[key2])
  423. self._box.update([(key2, self._template % 'changed 2'),
  424. (key1, self._template % 'changed 1'),
  425. (key0, self._template % 'original 0')])
  426. self.assertEqual(len(self._box), 3)
  427. self.assertEqual(self._box.get_string(key0),
  428. self._template % 'original 0')
  429. self.assertEqual(self._box.get_string(key1),
  430. self._template % 'changed 1')
  431. self.assertEqual(self._box.get_string(key2),
  432. self._template % 'changed 2')
  433. self.assertRaises(KeyError,
  434. lambda: self._box.update({'foo': 'bar',
  435. key0: self._template % "changed 0"}))
  436. self.assertEqual(len(self._box), 3)
  437. self.assertEqual(self._box.get_string(key0),
  438. self._template % "changed 0")
  439. self.assertEqual(self._box.get_string(key1),
  440. self._template % "changed 1")
  441. self.assertEqual(self._box.get_string(key2),
  442. self._template % "changed 2")
  443. def test_flush(self):
  444. # Write changes to disk
  445. self._test_flush_or_close(self._box.flush, True)
  446. def test_popitem_and_flush_twice(self):
  447. # See #15036.
  448. self._box.add(self._template % 0)
  449. self._box.add(self._template % 1)
  450. self._box.flush()
  451. self._box.popitem()
  452. self._box.flush()
  453. self._box.popitem()
  454. self._box.flush()
  455. def test_lock_unlock(self):
  456. # Lock and unlock the mailbox
  457. self.assertFalse(os.path.exists(self._get_lock_path()))
  458. self._box.lock()
  459. self.assertTrue(os.path.exists(self._get_lock_path()))
  460. self._box.unlock()
  461. self.assertFalse(os.path.exists(self._get_lock_path()))
  462. def test_close(self):
  463. # Close mailbox and flush changes to disk
  464. self._test_flush_or_close(self._box.close, False)
  465. def _test_flush_or_close(self, method, should_call_close):
  466. contents = [self._template % i for i in range(3)]
  467. self._box.add(contents[0])
  468. self._box.add(contents[1])
  469. self._box.add(contents[2])
  470. oldbox = self._box
  471. method()
  472. if should_call_close:
  473. self._box.close()
  474. self._box = self._factory(self._path)
  475. keys = self._box.keys()
  476. self.assertEqual(len(keys), 3)
  477. for key in keys:
  478. self.assertIn(self._box.get_string(key), contents)
  479. oldbox.close()
  480. def test_dump_message(self):
  481. # Write message representations to disk
  482. for input in (email.message_from_string(_sample_message),
  483. _sample_message, io.BytesIO(_bytes_sample_message)):
  484. output = io.BytesIO()
  485. self._box._dump_message(input, output)
  486. self.assertEqual(output.getvalue(),
  487. _bytes_sample_message.replace(b'\n', os.linesep.encode()))
  488. output = io.BytesIO()
  489. self.assertRaises(TypeError,
  490. lambda: self._box._dump_message(None, output))
  491. def _get_lock_path(self):
  492. # Return the path of the dot lock file. May be overridden.
  493. return self._path + '.lock'
  494. class TestMailboxSuperclass(TestBase, unittest.TestCase):
  495. def test_notimplemented(self):
  496. # Test that all Mailbox methods raise NotImplementedException.
  497. box = mailbox.Mailbox('path')
  498. self.assertRaises(NotImplementedError, lambda: box.add(''))
  499. self.assertRaises(NotImplementedError, lambda: box.remove(''))
  500. self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
  501. self.assertRaises(NotImplementedError, lambda: box.discard(''))
  502. self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
  503. self.assertRaises(NotImplementedError, lambda: box.iterkeys())
  504. self.assertRaises(NotImplementedError, lambda: box.keys())
  505. self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
  506. self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
  507. self.assertRaises(NotImplementedError, lambda: box.values())
  508. self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
  509. self.assertRaises(NotImplementedError, lambda: box.items())
  510. self.assertRaises(NotImplementedError, lambda: box.get(''))
  511. self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
  512. self.assertRaises(NotImplementedError, lambda: box.get_message(''))
  513. self.assertRaises(NotImplementedError, lambda: box.get_string(''))
  514. self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
  515. self.assertRaises(NotImplementedError, lambda: box.get_file(''))
  516. self.assertRaises(NotImplementedError, lambda: '' in box)
  517. self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
  518. self.assertRaises(NotImplementedError, lambda: box.__len__())
  519. self.assertRaises(NotImplementedError, lambda: box.clear())
  520. self.assertRaises(NotImplementedError, lambda: box.pop(''))
  521. self.assertRaises(NotImplementedError, lambda: box.popitem())
  522. self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
  523. self.assertRaises(NotImplementedError, lambda: box.flush())
  524. self.assertRaises(NotImplementedError, lambda: box.lock())
  525. self.assertRaises(NotImplementedError, lambda: box.unlock())
  526. self.assertRaises(NotImplementedError, lambda: box.close())
  527. class TestMaildir(TestMailbox, unittest.TestCase):
  528. _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
  529. def setUp(self):
  530. TestMailbox.setUp(self)
  531. if (os.name == 'nt') or (sys.platform == 'cygwin'):
  532. self._box.colon = '!'
  533. def assertMailboxEmpty(self):
  534. self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
  535. def test_add_MM(self):
  536. # Add a MaildirMessage instance
  537. msg = mailbox.MaildirMessage(self._template % 0)
  538. msg.set_subdir('cur')
  539. msg.set_info('foo')
  540. key = self._box.add(msg)
  541. self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
  542. (key, self._box.colon))))
  543. def test_get_MM(self):
  544. # Get a MaildirMessage instance
  545. msg = mailbox.MaildirMessage(self._template % 0)
  546. msg.set_subdir('cur')
  547. msg.set_flags('RF')
  548. key = self._box.add(msg)
  549. msg_returned = self._box.get_message(key)
  550. self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
  551. self.assertEqual(msg_returned.get_subdir(), 'cur')
  552. self.assertEqual(msg_returned.get_flags(), 'FR')
  553. def test_set_MM(self):
  554. # Set with a MaildirMessage instance
  555. msg0 = mailbox.MaildirMessage(self._template % 0)
  556. msg0.set_flags('TP')
  557. key = self._box.add(msg0)
  558. msg_returned = self._box.get_message(key)
  559. self.assertEqual(msg_returned.get_subdir(), 'new')
  560. self.assertEqual(msg_returned.get_flags(), 'PT')
  561. msg1 = mailbox.MaildirMessage(self._template % 1)
  562. self._box[key] = msg1
  563. msg_returned = self._box.get_message(key)
  564. self.assertEqual(msg_returned.get_subdir(), 'new')
  565. self.assertEqual(msg_returned.get_flags(), '')
  566. self.assertEqual(msg_returned.get_payload(), '1\n')
  567. msg2 = mailbox.MaildirMessage(self._template % 2)
  568. msg2.set_info('2,S')
  569. self._box[key] = msg2
  570. self._box[key] = self._template % 3
  571. msg_returned = self._box.get_message(key)
  572. self.assertEqual(msg_returned.get_subdir(), 'new')
  573. self.assertEqual(msg_returned.get_flags(), 'S')
  574. self.assertEqual(msg_returned.get_payload(), '3\n')
  575. def test_consistent_factory(self):
  576. # Add a message.
  577. msg = mailbox.MaildirMessage(self._template % 0)
  578. msg.set_subdir('cur')
  579. msg.set_flags('RF')
  580. key = self._box.add(msg)
  581. # Create new mailbox with
  582. class FakeMessage(mailbox.MaildirMessage):
  583. pass
  584. box = mailbox.Maildir(self._path, factory=FakeMessage)
  585. box.colon = self._box.colon
  586. msg2 = box.get_message(key)
  587. self.assertIsInstance(msg2, FakeMessage)
  588. def test_initialize_new(self):
  589. # Initialize a non-existent mailbox
  590. self.tearDown()
  591. self._box = mailbox.Maildir(self._path)
  592. self._check_basics()
  593. self._delete_recursively(self._path)
  594. self._box = self._factory(self._path, factory=None)
  595. self._check_basics()
  596. def test_initialize_existing(self):
  597. # Initialize an existing mailbox
  598. self.tearDown()
  599. for subdir in '', 'tmp', 'new', 'cur':
  600. os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
  601. self._box = mailbox.Maildir(self._path)
  602. self._check_basics()
  603. def _check_basics(self, factory=None):
  604. # (Used by test_open_new() and test_open_existing().)
  605. self.assertEqual(self._box._path, os.path.abspath(self._path))
  606. self.assertEqual(self._box._factory, factory)
  607. for subdir in '', 'tmp', 'new', 'cur':
  608. path = os.path.join(self._path, subdir)
  609. mode = os.stat(path)[stat.ST_MODE]
  610. self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
  611. def test_list_folders(self):
  612. # List folders
  613. self._box.add_folder('one')
  614. self._box.add_folder('two')
  615. self._box.add_folder('three')
  616. self.assertEqual(len(self._box.list_folders()), 3)
  617. self.assertEqual(set(self._box.list_folders()),
  618. set(('one', 'two', 'three')))
  619. def test_get_folder(self):
  620. # Open folders
  621. self._box.add_folder('foo.bar')
  622. folder0 = self._box.get_folder('foo.bar')
  623. folder0.add(self._template % 'bar')
  624. self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
  625. folder1 = self._box.get_folder('foo.bar')
  626. self.assertEqual(folder1.get_string(folder1.keys()[0]),
  627. self._template % 'bar')
  628. def test_add_and_remove_folders(self):
  629. # Delete folders
  630. self._box.add_folder('one')
  631. self._box.add_folder('two')
  632. self.assertEqual(len(self._box.list_folders()), 2)
  633. self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
  634. self._box.remove_folder('one')
  635. self.assertEqual(len(self._box.list_folders()), 1)
  636. self.assertEqual(set(self._box.list_folders()), set(('two',)))
  637. self._box.add_folder('three')
  638. self.assertEqual(len(self._box.list_folders()), 2)
  639. self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
  640. self._box.remove_folder('three')
  641. self.assertEqual(len(self._box.list_folders()), 1)
  642. self.assertEqual(set(self._box.list_folders()), set(('two',)))
  643. self._box.remove_folder('two')
  644. self.assertEqual(len(self._box.list_folders()), 0)
  645. self.assertEqual(self._box.list_folders(), [])
  646. def test_clean(self):
  647. # Remove old files from 'tmp'
  648. foo_path = os.path.join(self._path, 'tmp', 'foo')
  649. bar_path = os.path.join(self._path, 'tmp', 'bar')
  650. with open(foo_path, 'w', encoding='utf-8') as f:
  651. f.write("@")
  652. with open(bar_path, 'w', encoding='utf-8') as f:
  653. f.write("@")
  654. self._box.clean()
  655. self.assertTrue(os.path.exists(foo_path))
  656. self.assertTrue(os.path.exists(bar_path))
  657. foo_stat = os.stat(foo_path)
  658. os.utime(foo_path, (time.time() - 129600 - 2,
  659. foo_stat.st_mtime))
  660. self._box.clean()
  661. self.assertFalse(os.path.exists(foo_path))
  662. self.assertTrue(os.path.exists(bar_path))
  663. def test_create_tmp(self, repetitions=10):
  664. # Create files in tmp directory
  665. hostname = socket.gethostname()
  666. if '/' in hostname:
  667. hostname = hostname.replace('/', r'\057')
  668. if ':' in hostname:
  669. hostname = hostname.replace(':', r'\072')
  670. pid = os.getpid()
  671. pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
  672. r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)")
  673. previous_groups = None
  674. for x in range(repetitions):
  675. tmp_file = self._box._create_tmp()
  676. head, tail = os.path.split(tmp_file.name)
  677. self.assertEqual(head, os.path.abspath(os.path.join(self._path,
  678. "tmp")),
  679. "File in wrong location: '%s'" % head)
  680. match = pattern.match(tail)
  681. self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
  682. groups = match.groups()
  683. if previous_groups is not None:
  684. self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
  685. "Non-monotonic seconds: '%s' before '%s'" %
  686. (previous_groups[0], groups[0]))
  687. if int(groups[0]) == int(previous_groups[0]):
  688. self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
  689. "Non-monotonic milliseconds: '%s' before '%s'" %
  690. (previous_groups[1], groups[1]))
  691. self.assertEqual(int(groups[2]), pid,
  692. "Process ID mismatch: '%s' should be '%s'" %
  693. (groups[2], pid))
  694. self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
  695. "Non-sequential counter: '%s' before '%s'" %
  696. (previous_groups[3], groups[3]))
  697. self.assertEqual(groups[4], hostname,
  698. "Host name mismatch: '%s' should be '%s'" %
  699. (groups[4], hostname))
  700. previous_groups = groups
  701. tmp_file.write(_bytes_sample_message)
  702. tmp_file.seek(0)
  703. self.assertEqual(tmp_file.read(), _bytes_sample_message)
  704. tmp_file.close()
  705. file_count = len(os.listdir(os.path.join(self._path, "tmp")))
  706. self.assertEqual(file_count, repetitions,
  707. "Wrong file count: '%s' should be '%s'" %
  708. (file_count, repetitions))
  709. def test_refresh(self):
  710. # Update the table of contents
  711. self.assertEqual(self._box._toc, {})
  712. key0 = self._box.add(self._template % 0)
  713. key1 = self._box.add(self._template % 1)
  714. self.assertEqual(self._box._toc, {})
  715. self._box._refresh()
  716. self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
  717. key1: os.path.join('new', key1)})
  718. key2 = self._box.add(self._template % 2)
  719. self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
  720. key1: os.path.join('new', key1)})
  721. self._box._refresh()
  722. self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
  723. key1: os.path.join('new', key1),
  724. key2: os.path.join('new', key2)})
  725. def test_refresh_after_safety_period(self):
  726. # Issue #13254: Call _refresh after the "file system safety
  727. # period" of 2 seconds has passed; _toc should still be
  728. # updated because this is the first call to _refresh.
  729. key0 = self._box.add(self._template % 0)
  730. key1 = self._box.add(self._template % 1)
  731. self._box = self._factory(self._path)
  732. self.assertEqual(self._box._toc, {})
  733. # Emulate sleeping. Instead of sleeping for 2 seconds, use the
  734. # skew factor to make _refresh think that the filesystem
  735. # safety period has passed and re-reading the _toc is only
  736. # required if mtimes differ.
  737. self._box._skewfactor = -3
  738. self._box._refresh()
  739. self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
  740. def test_lookup(self):
  741. # Look up message subpaths in the TOC
  742. self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
  743. key0 = self._box.add(self._template % 0)
  744. self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
  745. os.remove(os.path.join(self._path, 'new', key0))
  746. self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
  747. # Be sure that the TOC is read back from disk (see issue #6896
  748. # about bad mtime behaviour on some systems).
  749. self._box.flush()
  750. self.assertRaises(KeyError, lambda: self._box._lookup(key0))
  751. self.assertEqual(self._box._toc, {})
  752. def test_lock_unlock(self):
  753. # Lock and unlock the mailbox. For Maildir, this does nothing.
  754. self._box.lock()
  755. self._box.unlock()
  756. def test_folder (self):
  757. # Test for bug #1569790: verify that folders returned by .get_folder()
  758. # use the same factory function.
  759. def dummy_factory (s):
  760. return None
  761. box = self._factory(self._path, factory=dummy_factory)
  762. folder = box.add_folder('folder1')
  763. self.assertIs(folder._factory, dummy_factory)
  764. folder1_alias = box.get_folder('folder1')
  765. self.assertIs(folder1_alias._factory, dummy_factory)
  766. def test_directory_in_folder (self):
  767. # Test that mailboxes still work if there's a stray extra directory
  768. # in a folder.
  769. for i in range(10):
  770. self._box.add(mailbox.Message(_sample_message))
  771. # Create a stray directory
  772. os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
  773. # Check that looping still works with the directory present.
  774. for msg in self._box:
  775. pass
  776. @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
  777. def test_file_permissions(self):
  778. # Verify that message files are created without execute permissions
  779. msg = mailbox.MaildirMessage(self._template % 0)
  780. orig_umask = os.umask(0)
  781. try:
  782. key = self._box.add(msg)
  783. finally:
  784. os.umask(orig_umask)
  785. path = os.path.join(self._path, self._box._lookup(key))
  786. mode = os.stat(path).st_mode
  787. self.assertFalse(mode & 0o111)
  788. @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
  789. def test_folder_file_perms(self):
  790. # From bug #3228, we want to verify that the file created inside a Maildir
  791. # subfolder isn't marked as executable.
  792. orig_umask = os.umask(0)
  793. try:
  794. subfolder = self._box.add_folder('subfolder')
  795. finally:
  796. os.umask(orig_umask)
  797. path = os.path.join(subfolder._path, 'maildirfolder')
  798. st = os.stat(path)
  799. perms = st.st_mode
  800. self.assertFalse((perms & 0o111)) # Execute bits should all be off.
  801. def test_reread(self):
  802. # Do an initial unconditional refresh
  803. self._box._refresh()
  804. # Put the last modified times more than two seconds into the past
  805. # (because mtime may have a two second granularity)
  806. for subdir in ('cur', 'new'):
  807. os.utime(os.path.join(self._box._path, subdir),
  808. (time.time()-5,)*2)
  809. # Because mtime has a two second granularity in worst case (FAT), a
  810. # refresh is done unconditionally if called for within
  811. # two-second-plus-a-bit of the last one, just in case the mbox has
  812. # changed; so now we have to wait for that interval to expire.
  813. #
  814. # Because this is a test, emulate sleeping. Instead of
  815. # sleeping for 2 seconds, use the skew factor to make _refresh
  816. # think that 2 seconds have passed and re-reading the _toc is
  817. # only required if mtimes differ.
  818. self._box._skewfactor = -3
  819. # Re-reading causes the ._toc attribute to be assigned a new dictionary
  820. # object, so we'll check that the ._toc attribute isn't a different
  821. # object.
  822. orig_toc = self._box._toc
  823. def refreshed():
  824. return self._box._toc is not orig_toc
  825. self._box._refresh()
  826. self.assertFalse(refreshed())
  827. # Now, write something into cur and remove it. This changes
  828. # the mtime and should cause a re-read. Note that "sleep
  829. # emulation" is still in effect, as skewfactor is -3.
  830. filename = os.path.join(self._path, 'cur', 'stray-file')
  831. os_helper.create_empty_file(filename)
  832. os.unlink(filename)
  833. self._box._refresh()
  834. self.assertTrue(refreshed())
  835. class _TestSingleFile(TestMailbox):
  836. '''Common tests for single-file mailboxes'''
  837. def test_add_doesnt_rewrite(self):
  838. # When only adding messages, flush() should not rewrite the
  839. # mailbox file. See issue #9559.
  840. # Inode number changes if the contents are written to another
  841. # file which is then renamed over the original file. So we
  842. # must check that the inode number doesn't change.
  843. inode_before = os.stat(self._path).st_ino
  844. self._box.add(self._template % 0)
  845. self._box.flush()
  846. inode_after = os.stat(self._path).st_ino
  847. self.assertEqual(inode_before, inode_after)
  848. # Make sure the message was really added
  849. self._box.close()
  850. self._box = self._factory(self._path)
  851. self.assertEqual(len(self._box), 1)
  852. def test_permissions_after_flush(self):
  853. # See issue #5346
  854. # Make the mailbox world writable. It's unlikely that the new
  855. # mailbox file would have these permissions after flush(),
  856. # because umask usually prevents it.
  857. mode = os.stat(self._path).st_mode | 0o666
  858. os.chmod(self._path, mode)
  859. self._box.add(self._template % 0)
  860. i = self._box.add(self._template % 1)
  861. # Need to remove one message to make flush() create a new file
  862. self._box.remove(i)
  863. self._box.flush()
  864. self.assertEqual(os.stat(self._path).st_mode, mode)
  865. class _TestMboxMMDF(_TestSingleFile):
  866. def tearDown(self):
  867. super().tearDown()
  868. self._box.close()
  869. self._delete_recursively(self._path)
  870. for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
  871. os_helper.unlink(lock_remnant)
  872. def assertMailboxEmpty(self):
  873. with open(self._path, 'rb') as f:
  874. self.assertEqual(f.readlines(), [])
  875. def test_get_bytes_from(self):
  876. # Get bytes representations of messages with _unixfrom.
  877. unixfrom = 'From foo@bar blah\n'
  878. key0 = self._box.add(unixfrom + self._template % 0)
  879. key1 = self._box.add(unixfrom + _sample_message)
  880. self.assertEqual(self._box.get_bytes(key0, from_=False),
  881. (self._template % 0).encode('ascii'))
  882. self.assertEqual(self._box.get_bytes(key1, from_=False),
  883. _bytes_sample_message)
  884. self.assertEqual(self._box.get_bytes(key0, from_=True),
  885. (unixfrom + self._template % 0).encode('ascii'))
  886. self.assertEqual(self._box.get_bytes(key1, from_=True),
  887. unixfrom.encode('ascii') + _bytes_sample_message)
  888. def test_get_string_from(self):
  889. # Get string representations of messages with _unixfrom.
  890. unixfrom = 'From foo@bar blah\n'
  891. key0 = self._box.add(unixfrom + self._template % 0)
  892. key1 = self._box.add(unixfrom + _sample_message)
  893. self.assertEqual(self._box.get_string(key0, from_=False),
  894. self._template % 0)
  895. self.assertEqual(self._box.get_string(key1, from_=False).split('\n'),
  896. _sample_message.split('\n'))
  897. self.assertEqual(self._box.get_string(key0, from_=True),
  898. unixfrom + self._template % 0)
  899. self.assertEqual(self._box.get_string(key1, from_=True).split('\n'),
  900. (unixfrom + _sample_message).split('\n'))
  901. def test_add_from_string(self):
  902. # Add a string starting with 'From ' to the mailbox
  903. key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
  904. self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
  905. self.assertEqual(self._box[key].get_payload(), '0\n')
  906. def test_add_from_bytes(self):
  907. # Add a byte string starting with 'From ' to the mailbox
  908. key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
  909. self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
  910. self.assertEqual(self._box[key].get_payload(), '0\n')
  911. def test_add_mbox_or_mmdf_message(self):
  912. # Add an mboxMessage or MMDFMessage
  913. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  914. msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
  915. key = self._box.add(msg)
  916. def test_open_close_open(self):
  917. # Open and inspect previously-created mailbox
  918. values = [self._template % i for i in range(3)]
  919. for value in values:
  920. self._box.add(value)
  921. self._box.close()
  922. mtime = os.path.getmtime(self._path)
  923. self._box = self._factory(self._path)
  924. self.assertEqual(len(self._box), 3)
  925. for key in self._box.iterkeys():
  926. self.assertIn(self._box.get_string(key), values)
  927. self._box.close()
  928. self.assertEqual(mtime, os.path.getmtime(self._path))
  929. def test_add_and_close(self):
  930. # Verifying that closing a mailbox doesn't change added items
  931. self._box.add(_sample_message)
  932. for i in range(3):
  933. self._box.add(self._template % i)
  934. self._box.add(_sample_message)
  935. self._box._file.flush()
  936. self._box._file.seek(0)
  937. contents = self._box._file.read()
  938. self._box.close()
  939. with open(self._path, 'rb') as f:
  940. self.assertEqual(contents, f.read())
  941. self._box = self._factory(self._path)
  942. @support.requires_fork()
  943. @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
  944. def test_lock_conflict(self):
  945. # Fork off a child process that will lock the mailbox temporarily,
  946. # unlock it and exit.
  947. c, p = socket.socketpair()
  948. self.addCleanup(c.close)
  949. self.addCleanup(p.close)
  950. pid = os.fork()
  951. if pid == 0:
  952. # child
  953. try:
  954. # lock the mailbox, and signal the parent it can proceed
  955. self._box.lock()
  956. c.send(b'c')
  957. # wait until the parent is done, and unlock the mailbox
  958. c.recv(1)
  959. self._box.unlock()
  960. finally:
  961. os._exit(0)
  962. # In the parent, wait until the child signals it locked the mailbox.
  963. p.recv(1)
  964. try:
  965. self.assertRaises(mailbox.ExternalClashError,
  966. self._box.lock)
  967. finally:
  968. # Signal the child it can now release the lock and exit.
  969. p.send(b'p')
  970. # Wait for child to exit. Locking should now succeed.
  971. support.wait_process(pid, exitcode=0)
  972. self._box.lock()
  973. self._box.unlock()
  974. def test_relock(self):
  975. # Test case for bug #1575506: the mailbox class was locking the
  976. # wrong file object in its flush() method.
  977. msg = "Subject: sub\n\nbody\n"
  978. key1 = self._box.add(msg)
  979. self._box.flush()
  980. self._box.close()
  981. self._box = self._factory(self._path)
  982. self._box.lock()
  983. key2 = self._box.add(msg)
  984. self._box.flush()
  985. self.assertTrue(self._box._locked)
  986. self._box.close()
  987. class TestMbox(_TestMboxMMDF, unittest.TestCase):
  988. _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
  989. @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
  990. def test_file_perms(self):
  991. # From bug #3228, we want to verify that the mailbox file isn't executable,
  992. # even if the umask is set to something that would leave executable bits set.
  993. # We only run this test on platforms that support umask.
  994. try:
  995. old_umask = os.umask(0o077)
  996. self._box.close()
  997. os.unlink(self._path)
  998. self._box = mailbox.mbox(self._path, create=True)
  999. self._box.add('')
  1000. self._box.close()
  1001. finally:
  1002. os.umask(old_umask)
  1003. st = os.stat(self._path)
  1004. perms = st.st_mode
  1005. self.assertFalse((perms & 0o111)) # Execute bits should all be off.
  1006. def test_terminating_newline(self):
  1007. message = email.message.Message()
  1008. message['From'] = 'john@example.com'
  1009. message.set_payload('No newline at the end')
  1010. i = self._box.add(message)
  1011. # A newline should have been appended to the payload
  1012. message = self._box.get(i)
  1013. self.assertEqual(message.get_payload(), 'No newline at the end\n')
  1014. def test_message_separator(self):
  1015. # Check there's always a single blank line after each message
  1016. self._box.add('From: foo\n\n0') # No newline at the end
  1017. with open(self._path, encoding='utf-8') as f:
  1018. data = f.read()
  1019. self.assertEqual(data[-3:], '0\n\n')
  1020. self._box.add('From: foo\n\n0\n') # Newline at the end
  1021. with open(self._path, encoding='utf-8') as f:
  1022. data = f.read()
  1023. self.assertEqual(data[-3:], '0\n\n')
  1024. class TestMMDF(_TestMboxMMDF, unittest.TestCase):
  1025. _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
  1026. class TestMH(TestMailbox, unittest.TestCase):
  1027. _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
  1028. def assertMailboxEmpty(self):
  1029. self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
  1030. def test_list_folders(self):
  1031. # List folders
  1032. self._box.add_folder('one')
  1033. self._box.add_folder('two')
  1034. self._box.add_folder('three')
  1035. self.assertEqual(len(self._box.list_folders()), 3)
  1036. self.assertEqual(set(self._box.list_folders()),
  1037. set(('one', 'two', 'three')))
  1038. def test_get_folder(self):
  1039. # Open folders
  1040. def dummy_factory (s):
  1041. return None
  1042. self._box = self._factory(self._path, dummy_factory)
  1043. new_folder = self._box.add_folder('foo.bar')
  1044. folder0 = self._box.get_folder('foo.bar')
  1045. folder0.add(self._template % 'bar')
  1046. self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
  1047. folder1 = self._box.get_folder('foo.bar')
  1048. self.assertEqual(folder1.get_string(folder1.keys()[0]),
  1049. self._template % 'bar')
  1050. # Test for bug #1569790: verify that folders returned by .get_folder()
  1051. # use the same factory function.
  1052. self.assertIs(new_folder._factory, self._box._factory)
  1053. self.assertIs(folder0._factory, self._box._factory)
  1054. def test_add_and_remove_folders(self):
  1055. # Delete folders
  1056. self._box.add_folder('one')
  1057. self._box.add_folder('two')
  1058. self.assertEqual(len(self._box.list_folders()), 2)
  1059. self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
  1060. self._box.remove_folder('one')
  1061. self.assertEqual(len(self._box.list_folders()), 1)
  1062. self.assertEqual(set(self._box.list_folders()), set(('two',)))
  1063. self._box.add_folder('three')
  1064. self.assertEqual(len(self._box.list_folders()), 2)
  1065. self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
  1066. self._box.remove_folder('three')
  1067. self.assertEqual(len(self._box.list_folders()), 1)
  1068. self.assertEqual(set(self._box.list_folders()), set(('two',)))
  1069. self._box.remove_folder('two')
  1070. self.assertEqual(len(self._box.list_folders()), 0)
  1071. self.assertEqual(self._box.list_folders(), [])
  1072. def test_sequences(self):
  1073. # Get and set sequences
  1074. self.assertEqual(self._box.get_sequences(), {})
  1075. msg0 = mailbox.MHMessage(self._template % 0)
  1076. msg0.add_sequence('foo')
  1077. key0 = self._box.add(msg0)
  1078. self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
  1079. msg1 = mailbox.MHMessage(self._template % 1)
  1080. msg1.set_sequences(['bar', 'replied', 'foo'])
  1081. key1 = self._box.add(msg1)
  1082. self.assertEqual(self._box.get_sequences(),
  1083. {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
  1084. msg0.set_sequences(['flagged'])
  1085. self._box[key0] = msg0
  1086. self.assertEqual(self._box.get_sequences(),
  1087. {'foo':[key1], 'bar':[key1], 'replied':[key1],
  1088. 'flagged':[key0]})
  1089. self._box.remove(key1)
  1090. self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
  1091. def test_issue2625(self):
  1092. msg0 = mailbox.MHMessage(self._template % 0)
  1093. msg0.add_sequence('foo')
  1094. key0 = self._box.add(msg0)
  1095. refmsg0 = self._box.get_message(key0)
  1096. def test_issue7627(self):
  1097. msg0 = mailbox.MHMessage(self._template % 0)
  1098. key0 = self._box.add(msg0)
  1099. self._box.lock()
  1100. self._box.remove(key0)
  1101. self._box.unlock()
  1102. def test_pack(self):
  1103. # Pack the contents of the mailbox
  1104. msg0 = mailbox.MHMessage(self._template % 0)
  1105. msg1 = mailbox.MHMessage(self._template % 1)
  1106. msg2 = mailbox.MHMessage(self._template % 2)
  1107. msg3 = mailbox.MHMessage(self._template % 3)
  1108. msg0.set_sequences(['foo', 'unseen'])
  1109. msg1.set_sequences(['foo'])
  1110. msg2.set_sequences(['foo', 'flagged'])
  1111. msg3.set_sequences(['foo', 'bar', 'replied'])
  1112. key0 = self._box.add(msg0)
  1113. key1 = self._box.add(msg1)
  1114. key2 = self._box.add(msg2)
  1115. key3 = self._box.add(msg3)
  1116. self.assertEqual(self._box.get_sequences(),
  1117. {'foo':[key0,key1,key2,key3], 'unseen':[key0],
  1118. 'flagged':[key2], 'bar':[key3], 'replied':[key3]})
  1119. self._box.remove(key2)
  1120. self.assertEqual(self._box.get_sequences(),
  1121. {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
  1122. 'replied':[key3]})
  1123. self._box.pack()
  1124. self.assertEqual(self._box.keys(), [1, 2, 3])
  1125. key0 = key0
  1126. key1 = key0 + 1
  1127. key2 = key1 + 1
  1128. self.assertEqual(self._box.get_sequences(),
  1129. {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
  1130. # Test case for packing while holding the mailbox locked.
  1131. key0 = self._box.add(msg1)
  1132. key1 = self._box.add(msg1)
  1133. key2 = self._box.add(msg1)
  1134. key3 = self._box.add(msg1)
  1135. self._box.remove(key0)
  1136. self._box.remove(key2)
  1137. self._box.lock()
  1138. self._box.pack()
  1139. self._box.unlock()
  1140. self.assertEqual(self._box.get_sequences(),
  1141. {'foo':[1, 2, 3, 4, 5],
  1142. 'unseen':[1], 'bar':[3], 'replied':[3]})
  1143. def _get_lock_path(self):
  1144. return os.path.join(self._path, '.mh_sequences.lock')
  1145. class TestBabyl(_TestSingleFile, unittest.TestCase):
  1146. _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
  1147. def assertMailboxEmpty(self):
  1148. with open(self._path, 'rb') as f:
  1149. self.assertEqual(f.readlines(), [])
  1150. def tearDown(self):
  1151. super().tearDown()
  1152. self._box.close()
  1153. self._delete_recursively(self._path)
  1154. for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
  1155. os_helper.unlink(lock_remnant)
  1156. def test_labels(self):
  1157. # Get labels from the mailbox
  1158. self.assertEqual(self._box.get_labels(), [])
  1159. msg0 = mailbox.BabylMessage(self._template % 0)
  1160. msg0.add_label('foo')
  1161. key0 = self._box.add(msg0)
  1162. self.assertEqual(self._box.get_labels(), ['foo'])
  1163. msg1 = mailbox.BabylMessage(self._template % 1)
  1164. msg1.set_labels(['bar', 'answered', 'foo'])
  1165. key1 = self._box.add(msg1)
  1166. self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
  1167. msg0.set_labels(['blah', 'filed'])
  1168. self._box[key0] = msg0
  1169. self.assertEqual(set(self._box.get_labels()),
  1170. set(['foo', 'bar', 'blah']))
  1171. self._box.remove(key1)
  1172. self.assertEqual(set(self._box.get_labels()), set(['blah']))
  1173. class FakeFileLikeObject:
  1174. def __init__(self):
  1175. self.closed = False
  1176. def close(self):
  1177. self.closed = True
  1178. class FakeMailBox(mailbox.Mailbox):
  1179. def __init__(self):
  1180. mailbox.Mailbox.__init__(self, '', lambda file: None)
  1181. self.files = [FakeFileLikeObject() for i in range(10)]
  1182. def get_file(self, key):
  1183. return self.files[key]
  1184. class TestFakeMailBox(unittest.TestCase):
  1185. def test_closing_fd(self):
  1186. box = FakeMailBox()
  1187. for i in range(10):
  1188. self.assertFalse(box.files[i].closed)
  1189. for i in range(10):
  1190. box[i]
  1191. for i in range(10):
  1192. self.assertTrue(box.files[i].closed)
  1193. class TestMessage(TestBase, unittest.TestCase):
  1194. _factory = mailbox.Message # Overridden by subclasses to reuse tests
  1195. def setUp(self):
  1196. self._path = os_helper.TESTFN
  1197. def tearDown(self):
  1198. self._delete_recursively(self._path)
  1199. def test_initialize_with_eMM(self):
  1200. # Initialize based on email.message.Message instance
  1201. eMM = email.message_from_string(_sample_message)
  1202. msg = self._factory(eMM)
  1203. self._post_initialize_hook(msg)
  1204. self._check_sample(msg)
  1205. def test_initialize_with_string(self):
  1206. # Initialize based on string
  1207. msg = self._factory(_sample_message)
  1208. self._post_initialize_hook(msg)
  1209. self._check_sample(msg)
  1210. def test_initialize_with_file(self):
  1211. # Initialize based on contents of file
  1212. with open(self._path, 'w+', encoding='utf-8') as f:
  1213. f.write(_sample_message)
  1214. f.seek(0)
  1215. msg = self._factory(f)
  1216. self._post_initialize_hook(msg)
  1217. self._check_sample(msg)
  1218. def test_initialize_with_binary_file(self):
  1219. # Initialize based on contents of binary file
  1220. with open(self._path, 'wb+') as f:
  1221. f.write(_bytes_sample_message)
  1222. f.seek(0)
  1223. msg = self._factory(f)
  1224. self._post_initialize_hook(msg)
  1225. self._check_sample(msg)
  1226. def test_initialize_with_nothing(self):
  1227. # Initialize without arguments
  1228. msg = self._factory()
  1229. self._post_initialize_hook(msg)
  1230. self.assertIsInstance(msg, email.message.Message)
  1231. self.assertIsInstance(msg, mailbox.Message)
  1232. self.assertIsInstance(msg, self._factory)
  1233. self.assertEqual(msg.keys(), [])
  1234. self.assertFalse(msg.is_multipart())
  1235. self.assertIsNone(msg.get_payload())
  1236. def test_initialize_incorrectly(self):
  1237. # Initialize with invalid argument
  1238. self.assertRaises(TypeError, lambda: self._factory(object()))
  1239. def test_all_eMM_attributes_exist(self):
  1240. # Issue 12537
  1241. eMM = email.message_from_string(_sample_message)
  1242. msg = self._factory(_sample_message)
  1243. for attr in eMM.__dict__:
  1244. self.assertIn(attr, msg.__dict__,
  1245. '{} attribute does not exist'.format(attr))
  1246. def test_become_message(self):
  1247. # Take on the state of another message
  1248. eMM = email.message_from_string(_sample_message)
  1249. msg = self._factory()
  1250. msg._become_message(eMM)
  1251. self._check_sample(msg)
  1252. def test_explain_to(self):
  1253. # Copy self's format-specific data to other message formats.
  1254. # This test is superficial; better ones are in TestMessageConversion.
  1255. msg = self._factory()
  1256. for class_ in self.all_mailbox_types:
  1257. other_msg = class_()
  1258. msg._explain_to(other_msg)
  1259. other_msg = email.message.Message()
  1260. self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
  1261. def _post_initialize_hook(self, msg):
  1262. # Overridden by subclasses to check extra things after initialization
  1263. pass
  1264. class TestMaildirMessage(TestMessage, unittest.TestCase):
  1265. _factory = mailbox.MaildirMessage
  1266. def _post_initialize_hook(self, msg):
  1267. self.assertEqual(msg._subdir, 'new')
  1268. self.assertEqual(msg._info, '')
  1269. def test_subdir(self):
  1270. # Use get_subdir() and set_subdir()
  1271. msg = mailbox.MaildirMessage(_sample_message)
  1272. self.assertEqual(msg.get_subdir(), 'new')
  1273. msg.set_subdir('cur')
  1274. self.assertEqual(msg.get_subdir(), 'cur')
  1275. msg.set_subdir('new')
  1276. self.assertEqual(msg.get_subdir(), 'new')
  1277. self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
  1278. self.assertEqual(msg.get_subdir(), 'new')
  1279. msg.set_subdir('new')
  1280. self.assertEqual(msg.get_subdir(), 'new')
  1281. self._check_sample(msg)
  1282. def test_flags(self):
  1283. # Use get_flags(), set_flags(), add_flag(), remove_flag()
  1284. msg = mailbox.MaildirMessage(_sample_message)
  1285. self.assertEqual(msg.get_flags(), '')
  1286. self.assertEqual(msg.get_subdir(), 'new')
  1287. msg.set_flags('F')
  1288. self.assertEqual(msg.get_subdir(), 'new')
  1289. self.assertEqual(msg.get_flags(), 'F')
  1290. msg.set_flags('SDTP')
  1291. self.assertEqual(msg.get_flags(), 'DPST')
  1292. msg.add_flag('FT')
  1293. self.assertEqual(msg.get_flags(), 'DFPST')
  1294. msg.remove_flag('TDRP')
  1295. self.assertEqual(msg.get_flags(), 'FS')
  1296. self.assertEqual(msg.get_subdir(), 'new')
  1297. self._check_sample(msg)
  1298. def test_date(self):
  1299. # Use get_date() and set_date()
  1300. msg = mailbox.MaildirMessage(_sample_message)
  1301. self.assertLess(abs(msg.get_date() - time.time()), 60)
  1302. msg.set_date(0.0)
  1303. self.assertEqual(msg.get_date(), 0.0)
  1304. def test_info(self):
  1305. # Use get_info() and set_info()
  1306. msg = mailbox.MaildirMessage(_sample_message)
  1307. self.assertEqual(msg.get_info(), '')
  1308. msg.set_info('1,foo=bar')
  1309. self.assertEqual(msg.get_info(), '1,foo=bar')
  1310. self.assertRaises(TypeError, lambda: msg.set_info(None))
  1311. self._check_sample(msg)
  1312. def test_info_and_flags(self):
  1313. # Test interaction of info and flag methods
  1314. msg = mailbox.MaildirMessage(_sample_message)
  1315. self.assertEqual(msg.get_info(), '')
  1316. msg.set_flags('SF')
  1317. self.assertEqual(msg.get_flags(), 'FS')
  1318. self.assertEqual(msg.get_info(), '2,FS')
  1319. msg.set_info('1,')
  1320. self.assertEqual(msg.get_flags(), '')
  1321. self.assertEqual(msg.get_info(), '1,')
  1322. msg.remove_flag('RPT')
  1323. self.assertEqual(msg.get_flags(), '')
  1324. self.assertEqual(msg.get_info(), '1,')
  1325. msg.add_flag('D')
  1326. self.assertEqual(msg.get_flags(), 'D')
  1327. self.assertEqual(msg.get_info(), '2,D')
  1328. self._check_sample(msg)
  1329. class _TestMboxMMDFMessage:
  1330. _factory = mailbox._mboxMMDFMessage
  1331. def _post_initialize_hook(self, msg):
  1332. self._check_from(msg)
  1333. def test_initialize_with_unixfrom(self):
  1334. # Initialize with a message that already has a _unixfrom attribute
  1335. msg = mailbox.Message(_sample_message)
  1336. msg.set_unixfrom('From foo@bar blah')
  1337. msg = mailbox.mboxMessage(msg)
  1338. self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
  1339. def test_from(self):
  1340. # Get and set "From " line
  1341. msg = mailbox.mboxMessage(_sample_message)
  1342. self._check_from(msg)
  1343. msg.set_from('foo bar')
  1344. self.assertEqual(msg.get_from(), 'foo bar')
  1345. msg.set_from('foo@bar', True)
  1346. self._check_from(msg, 'foo@bar')
  1347. msg.set_from('blah@temp', time.localtime())
  1348. self._check_from(msg, 'blah@temp')
  1349. def test_flags(self):
  1350. # Use get_flags(), set_flags(), add_flag(), remove_flag()
  1351. msg = mailbox.mboxMessage(_sample_message)
  1352. self.assertEqual(msg.get_flags(), '')
  1353. msg.set_flags('F')
  1354. self.assertEqual(msg.get_flags(), 'F')
  1355. msg.set_flags('XODR')
  1356. self.assertEqual(msg.get_flags(), 'RODX')
  1357. msg.add_flag('FA')
  1358. self.assertEqual(msg.get_flags(), 'RODFAX')
  1359. msg.remove_flag('FDXA')
  1360. self.assertEqual(msg.get_flags(), 'RO')
  1361. self._check_sample(msg)
  1362. def _check_from(self, msg, sender=None):
  1363. # Check contents of "From " line
  1364. if sender is None:
  1365. sender = "MAILER-DAEMON"
  1366. self.assertIsNotNone(re.match(
  1367. sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
  1368. msg.get_from()))
  1369. class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
  1370. _factory = mailbox.mboxMessage
  1371. class TestMHMessage(TestMessage, unittest.TestCase):
  1372. _factory = mailbox.MHMessage
  1373. def _post_initialize_hook(self, msg):
  1374. self.assertEqual(msg._sequences, [])
  1375. def test_sequences(self):
  1376. # Get, set, join, and leave sequences
  1377. msg = mailbox.MHMessage(_sample_message)
  1378. self.assertEqual(msg.get_sequences(), [])
  1379. msg.set_sequences(['foobar'])
  1380. self.assertEqual(msg.get_sequences(), ['foobar'])
  1381. msg.set_sequences([])
  1382. self.assertEqual(msg.get_sequences(), [])
  1383. msg.add_sequence('unseen')
  1384. self.assertEqual(msg.get_sequences(), ['unseen'])
  1385. msg.add_sequence('flagged')
  1386. self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
  1387. msg.add_sequence('flagged')
  1388. self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
  1389. msg.remove_sequence('unseen')
  1390. self.assertEqual(msg.get_sequences(), ['flagged'])
  1391. msg.add_sequence('foobar')
  1392. self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
  1393. msg.remove_sequence('replied')
  1394. self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
  1395. msg.set_sequences(['foobar', 'replied'])
  1396. self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
  1397. class TestBabylMessage(TestMessage, unittest.TestCase):
  1398. _factory = mailbox.BabylMessage
  1399. def _post_initialize_hook(self, msg):
  1400. self.assertEqual(msg._labels, [])
  1401. def test_labels(self):
  1402. # Get, set, join, and leave labels
  1403. msg = mailbox.BabylMessage(_sample_message)
  1404. self.assertEqual(msg.get_labels(), [])
  1405. msg.set_labels(['foobar'])
  1406. self.assertEqual(msg.get_labels(), ['foobar'])
  1407. msg.set_labels([])
  1408. self.assertEqual(msg.get_labels(), [])
  1409. msg.add_label('filed')
  1410. self.assertEqual(msg.get_labels(), ['filed'])
  1411. msg.add_label('resent')
  1412. self.assertEqual(msg.get_labels(), ['filed', 'resent'])
  1413. msg.add_label('resent')
  1414. self.assertEqual(msg.get_labels(), ['filed', 'resent'])
  1415. msg.remove_label('filed')
  1416. self.assertEqual(msg.get_labels(), ['resent'])
  1417. msg.add_label('foobar')
  1418. self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
  1419. msg.remove_label('unseen')
  1420. self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
  1421. msg.set_labels(['foobar', 'answered'])
  1422. self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
  1423. def test_visible(self):
  1424. # Get, set, and update visible headers
  1425. msg = mailbox.BabylMessage(_sample_message)
  1426. visible = msg.get_visible()
  1427. self.assertEqual(visible.keys(), [])
  1428. self.assertIsNone(visible.get_payload())
  1429. visible['User-Agent'] = 'FooBar 1.0'
  1430. visible['X-Whatever'] = 'Blah'
  1431. self.assertEqual(msg.get_visible().keys(), [])
  1432. msg.set_visible(visible)
  1433. visible = msg.get_visible()
  1434. self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
  1435. self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
  1436. self.assertEqual(visible['X-Whatever'], 'Blah')
  1437. self.assertIsNone(visible.get_payload())
  1438. msg.update_visible()
  1439. self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
  1440. self.assertIsNone(visible.get_payload())
  1441. visible = msg.get_visible()
  1442. self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
  1443. 'Subject'])
  1444. for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
  1445. self.assertEqual(visible[header], msg[header])
  1446. class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
  1447. _factory = mailbox.MMDFMessage
  1448. class TestMessageConversion(TestBase, unittest.TestCase):
  1449. def test_plain_to_x(self):
  1450. # Convert Message to all formats
  1451. for class_ in self.all_mailbox_types:
  1452. msg_plain = mailbox.Message(_sample_message)
  1453. msg = class_(msg_plain)
  1454. self._check_sample(msg)
  1455. def test_x_to_plain(self):
  1456. # Convert all formats to Message
  1457. for class_ in self.all_mailbox_types:
  1458. msg = class_(_sample_message)
  1459. msg_plain = mailbox.Message(msg)
  1460. self._check_sample(msg_plain)
  1461. def test_x_from_bytes(self):
  1462. # Convert all formats to Message
  1463. for class_ in self.all_mailbox_types:
  1464. msg = class_(_bytes_sample_message)
  1465. self._check_sample(msg)
  1466. def test_x_to_invalid(self):
  1467. # Convert all formats to an invalid format
  1468. for class_ in self.all_mailbox_types:
  1469. self.assertRaises(TypeError, lambda: class_(False))
  1470. def test_type_specific_attributes_removed_on_conversion(self):
  1471. reference = {class_: class_(_sample_message).__dict__
  1472. for class_ in self.all_mailbox_types}
  1473. for class1 in self.all_mailbox_types:
  1474. for class2 in self.all_mailbox_types:
  1475. if class1 is class2:
  1476. continue
  1477. source = class1(_sample_message)
  1478. target = class2(source)
  1479. type_specific = [a for a in reference[class1]
  1480. if a not in reference[class2]]
  1481. for attr in type_specific:
  1482. self.assertNotIn(attr, target.__dict__,
  1483. "while converting {} to {}".format(class1, class2))
  1484. def test_maildir_to_maildir(self):
  1485. # Convert MaildirMessage to MaildirMessage
  1486. msg_maildir = mailbox.MaildirMessage(_sample_message)
  1487. msg_maildir.set_flags('DFPRST')
  1488. msg_maildir.set_subdir('cur')
  1489. date = msg_maildir.get_date()
  1490. msg = mailbox.MaildirMessage(msg_maildir)
  1491. self._check_sample(msg)
  1492. self.assertEqual(msg.get_flags(), 'DFPRST')
  1493. self.assertEqual(msg.get_subdir(), 'cur')
  1494. self.assertEqual(msg.get_date(), date)
  1495. def test_maildir_to_mboxmmdf(self):
  1496. # Convert MaildirMessage to mboxmessage and MMDFMessage
  1497. pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
  1498. ('T', 'D'), ('DFPRST', 'RDFA'))
  1499. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1500. msg_maildir = mailbox.MaildirMessage(_sample_message)
  1501. msg_maildir.set_date(0.0)
  1502. for setting, result in pairs:
  1503. msg_maildir.set_flags(setting)
  1504. msg = class_(msg_maildir)
  1505. self.assertEqual(msg.get_flags(), result)
  1506. self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
  1507. time.asctime(time.gmtime(0.0)))
  1508. msg_maildir.set_subdir('cur')
  1509. self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
  1510. def test_maildir_to_mh(self):
  1511. # Convert MaildirMessage to MHMessage
  1512. msg_maildir = mailbox.MaildirMessage(_sample_message)
  1513. pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
  1514. ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
  1515. ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
  1516. for setting, result in pairs:
  1517. msg_maildir.set_flags(setting)
  1518. self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
  1519. result)
  1520. def test_maildir_to_babyl(self):
  1521. # Convert MaildirMessage to Babyl
  1522. msg_maildir = mailbox.MaildirMessage(_sample_message)
  1523. pairs = (('D', ['unseen']), ('F', ['unseen']),
  1524. ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
  1525. ('S', []), ('T', ['unseen', 'deleted']),
  1526. ('DFPRST', ['deleted', 'answered', 'forwarded']))
  1527. for setting, result in pairs:
  1528. msg_maildir.set_flags(setting)
  1529. self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
  1530. result)
  1531. def test_mboxmmdf_to_maildir(self):
  1532. # Convert mboxMessage and MMDFMessage to MaildirMessage
  1533. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1534. msg_mboxMMDF = class_(_sample_message)
  1535. msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
  1536. pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
  1537. ('RODFA', 'FRST'))
  1538. for setting, result in pairs:
  1539. msg_mboxMMDF.set_flags(setting)
  1540. msg = mailbox.MaildirMessage(msg_mboxMMDF)
  1541. self.assertEqual(msg.get_flags(), result)
  1542. self.assertEqual(msg.get_date(), 0.0)
  1543. msg_mboxMMDF.set_flags('O')
  1544. self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
  1545. 'cur')
  1546. def test_mboxmmdf_to_mboxmmdf(self):
  1547. # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
  1548. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1549. msg_mboxMMDF = class_(_sample_message)
  1550. msg_mboxMMDF.set_flags('RODFA')
  1551. msg_mboxMMDF.set_from('foo@bar')
  1552. for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1553. msg2 = class2_(msg_mboxMMDF)
  1554. self.assertEqual(msg2.get_flags(), 'RODFA')
  1555. self.assertEqual(msg2.get_from(), 'foo@bar')
  1556. def test_mboxmmdf_to_mh(self):
  1557. # Convert mboxMessage and MMDFMessage to MHMessage
  1558. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1559. msg_mboxMMDF = class_(_sample_message)
  1560. pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
  1561. ('F', ['unseen', 'flagged']),
  1562. ('A', ['unseen', 'replied']),
  1563. ('RODFA', ['replied', 'flagged']))
  1564. for setting, result in pairs:
  1565. msg_mboxMMDF.set_flags(setting)
  1566. self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
  1567. result)
  1568. def test_mboxmmdf_to_babyl(self):
  1569. # Convert mboxMessage and MMDFMessage to BabylMessage
  1570. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1571. msg = class_(_sample_message)
  1572. pairs = (('R', []), ('O', ['unseen']),
  1573. ('D', ['unseen', 'deleted']), ('F', ['unseen']),
  1574. ('A', ['unseen', 'answered']),
  1575. ('RODFA', ['deleted', 'answered']))
  1576. for setting, result in pairs:
  1577. msg.set_flags(setting)
  1578. self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
  1579. def test_mh_to_maildir(self):
  1580. # Convert MHMessage to MaildirMessage
  1581. pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
  1582. for setting, result in pairs:
  1583. msg = mailbox.MHMessage(_sample_message)
  1584. msg.add_sequence(setting)
  1585. self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
  1586. self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
  1587. msg = mailbox.MHMessage(_sample_message)
  1588. msg.add_sequence('unseen')
  1589. msg.add_sequence('replied')
  1590. msg.add_sequence('flagged')
  1591. self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
  1592. self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
  1593. def test_mh_to_mboxmmdf(self):
  1594. # Convert MHMessage to mboxMessage and MMDFMessage
  1595. pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
  1596. for setting, result in pairs:
  1597. msg = mailbox.MHMessage(_sample_message)
  1598. msg.add_sequence(setting)
  1599. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1600. self.assertEqual(class_(msg).get_flags(), result)
  1601. msg = mailbox.MHMessage(_sample_message)
  1602. msg.add_sequence('unseen')
  1603. msg.add_sequence('replied')
  1604. msg.add_sequence('flagged')
  1605. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1606. self.assertEqual(class_(msg).get_flags(), 'OFA')
  1607. def test_mh_to_mh(self):
  1608. # Convert MHMessage to MHMessage
  1609. msg = mailbox.MHMessage(_sample_message)
  1610. msg.add_sequence('unseen')
  1611. msg.add_sequence('replied')
  1612. msg.add_sequence('flagged')
  1613. self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
  1614. ['unseen', 'replied', 'flagged'])
  1615. def test_mh_to_babyl(self):
  1616. # Convert MHMessage to BabylMessage
  1617. pairs = (('unseen', ['unseen']), ('replied', ['answered']),
  1618. ('flagged', []))
  1619. for setting, result in pairs:
  1620. msg = mailbox.MHMessage(_sample_message)
  1621. msg.add_sequence(setting)
  1622. self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
  1623. msg = mailbox.MHMessage(_sample_message)
  1624. msg.add_sequence('unseen')
  1625. msg.add_sequence('replied')
  1626. msg.add_sequence('flagged')
  1627. self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
  1628. ['unseen', 'answered'])
  1629. def test_babyl_to_maildir(self):
  1630. # Convert BabylMessage to MaildirMessage
  1631. pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
  1632. ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
  1633. ('resent', 'PS'))
  1634. for setting, result in pairs:
  1635. msg = mailbox.BabylMessage(_sample_message)
  1636. msg.add_label(setting)
  1637. self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
  1638. self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
  1639. msg = mailbox.BabylMessage(_sample_message)
  1640. for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1641. 'edited', 'resent'):
  1642. msg.add_label(label)
  1643. self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
  1644. self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
  1645. def test_babyl_to_mboxmmdf(self):
  1646. # Convert BabylMessage to mboxMessage and MMDFMessage
  1647. pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
  1648. ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
  1649. ('resent', 'RO'))
  1650. for setting, result in pairs:
  1651. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1652. msg = mailbox.BabylMessage(_sample_message)
  1653. msg.add_label(setting)
  1654. self.assertEqual(class_(msg).get_flags(), result)
  1655. msg = mailbox.BabylMessage(_sample_message)
  1656. for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1657. 'edited', 'resent'):
  1658. msg.add_label(label)
  1659. for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1660. self.assertEqual(class_(msg).get_flags(), 'ODA')
  1661. def test_babyl_to_mh(self):
  1662. # Convert BabylMessage to MHMessage
  1663. pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
  1664. ('answered', ['replied']), ('forwarded', []), ('edited', []),
  1665. ('resent', []))
  1666. for setting, result in pairs:
  1667. msg = mailbox.BabylMessage(_sample_message)
  1668. msg.add_label(setting)
  1669. self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
  1670. msg = mailbox.BabylMessage(_sample_message)
  1671. for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1672. 'edited', 'resent'):
  1673. msg.add_label(label)
  1674. self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
  1675. ['unseen', 'replied'])
  1676. def test_babyl_to_babyl(self):
  1677. # Convert BabylMessage to BabylMessage
  1678. msg = mailbox.BabylMessage(_sample_message)
  1679. msg.update_visible()
  1680. for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1681. 'edited', 'resent'):
  1682. msg.add_label(label)
  1683. msg2 = mailbox.BabylMessage(msg)
  1684. self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
  1685. 'answered', 'forwarded', 'edited',
  1686. 'resent'])
  1687. self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
  1688. for key in msg.get_visible().keys():
  1689. self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
  1690. class TestProxyFileBase(TestBase):
  1691. def _test_read(self, proxy):
  1692. # Read by byte
  1693. proxy.seek(0)
  1694. self.assertEqual(proxy.read(), b'bar')
  1695. proxy.seek(1)
  1696. self.assertEqual(proxy.read(), b'ar')
  1697. proxy.seek(0)
  1698. self.assertEqual(proxy.read(2), b'ba')
  1699. proxy.seek(1)
  1700. self.assertEqual(proxy.read(-1), b'ar')
  1701. proxy.seek(2)
  1702. self.assertEqual(proxy.read(1000), b'r')
  1703. def _test_readline(self, proxy):
  1704. # Read by line
  1705. linesep = os.linesep.encode()
  1706. proxy.seek(0)
  1707. self.assertEqual(proxy.readline(), b'foo' + linesep)
  1708. self.assertEqual(proxy.readline(), b'bar' + linesep)
  1709. self.assertEqual(proxy.readline(), b'fred' + linesep)
  1710. self.assertEqual(proxy.readline(), b'bob')
  1711. proxy.seek(2)
  1712. self.assertEqual(proxy.readline(), b'o' + linesep)
  1713. proxy.seek(6 + 2 * len(os.linesep))
  1714. self.assertEqual(proxy.readline(), b'fred' + linesep)
  1715. proxy.seek(6 + 2 * len(os.linesep))
  1716. self.assertEqual(proxy.readline(2), b'fr')
  1717. self.assertEqual(proxy.readline(-10), b'ed' + linesep)
  1718. def _test_readlines(self, proxy):
  1719. # Read multiple lines
  1720. linesep = os.linesep.encode()
  1721. proxy.seek(0)
  1722. self.assertEqual(proxy.readlines(), [b'foo' + linesep,
  1723. b'bar' + linesep,
  1724. b'fred' + linesep, b'bob'])
  1725. proxy.seek(0)
  1726. self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
  1727. proxy.seek(3 + len(linesep))
  1728. self.assertEqual(proxy.readlines(4 + len(linesep)),
  1729. [b'bar' + linesep, b'fred' + linesep])
  1730. proxy.seek(3)
  1731. self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
  1732. b'fred' + linesep, b'bob'])
  1733. def _test_iteration(self, proxy):
  1734. # Iterate by line
  1735. linesep = os.linesep.encode()
  1736. proxy.seek(0)
  1737. iterator = iter(proxy)
  1738. self.assertEqual(next(iterator), b'foo' + linesep)
  1739. self.assertEqual(next(iterator), b'bar' + linesep)
  1740. self.assertEqual(next(iterator), b'fred' + linesep)
  1741. self.assertEqual(next(iterator), b'bob')
  1742. self.assertRaises(StopIteration, next, iterator)
  1743. def _test_seek_and_tell(self, proxy):
  1744. # Seek and use tell to check position
  1745. linesep = os.linesep.encode()
  1746. proxy.seek(3)
  1747. self.assertEqual(proxy.tell(), 3)
  1748. self.assertEqual(proxy.read(len(linesep)), linesep)
  1749. proxy.seek(2, 1)
  1750. self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
  1751. proxy.seek(-3 - len(linesep), 2)
  1752. self.assertEqual(proxy.read(3), b'bar')
  1753. proxy.seek(2, 0)
  1754. self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
  1755. proxy.seek(100)
  1756. self.assertFalse(proxy.read())
  1757. def _test_close(self, proxy):
  1758. # Close a file
  1759. self.assertFalse(proxy.closed)
  1760. proxy.close()
  1761. self.assertTrue(proxy.closed)
  1762. # Issue 11700 subsequent closes should be a no-op.
  1763. proxy.close()
  1764. self.assertTrue(proxy.closed)
  1765. class TestProxyFile(TestProxyFileBase, unittest.TestCase):
  1766. def setUp(self):
  1767. self._path = os_helper.TESTFN
  1768. self._file = open(self._path, 'wb+')
  1769. def tearDown(self):
  1770. self._file.close()
  1771. self._delete_recursively(self._path)
  1772. def test_initialize(self):
  1773. # Initialize and check position
  1774. self._file.write(b'foo')
  1775. pos = self._file.tell()
  1776. proxy0 = mailbox._ProxyFile(self._file)
  1777. self.assertEqual(proxy0.tell(), pos)
  1778. self.assertEqual(self._file.tell(), pos)
  1779. proxy1 = mailbox._ProxyFile(self._file, 0)
  1780. self.assertEqual(proxy1.tell(), 0)
  1781. self.assertEqual(self._file.tell(), pos)
  1782. def test_read(self):
  1783. self._file.write(b'bar')
  1784. self._test_read(mailbox._ProxyFile(self._file))
  1785. def test_readline(self):
  1786. self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
  1787. os.linesep), 'ascii'))
  1788. self._test_readline(mailbox._ProxyFile(self._file))
  1789. def test_readlines(self):
  1790. self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
  1791. os.linesep), 'ascii'))
  1792. self._test_readlines(mailbox._ProxyFile(self._file))
  1793. def test_iteration(self):
  1794. self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
  1795. os.linesep), 'ascii'))
  1796. self._test_iteration(mailbox._ProxyFile(self._file))
  1797. def test_seek_and_tell(self):
  1798. self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
  1799. self._test_seek_and_tell(mailbox._ProxyFile(self._file))
  1800. def test_close(self):
  1801. self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
  1802. self._test_close(mailbox._ProxyFile(self._file))
  1803. class TestPartialFile(TestProxyFileBase, unittest.TestCase):
  1804. def setUp(self):
  1805. self._path = os_helper.TESTFN
  1806. self._file = open(self._path, 'wb+')
  1807. def tearDown(self):
  1808. self._file.close()
  1809. self._delete_recursively(self._path)
  1810. def test_initialize(self):
  1811. # Initialize and check position
  1812. self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
  1813. pos = self._file.tell()
  1814. proxy = mailbox._PartialFile(self._file, 2, 5)
  1815. self.assertEqual(proxy.tell(), 0)
  1816. self.assertEqual(self._file.tell(), pos)
  1817. def test_read(self):
  1818. self._file.write(bytes('***bar***', 'ascii'))
  1819. self._test_read(mailbox._PartialFile(self._file, 3, 6))
  1820. def test_readline(self):
  1821. self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
  1822. (os.linesep, os.linesep, os.linesep), 'ascii'))
  1823. self._test_readline(mailbox._PartialFile(self._file, 5,
  1824. 18 + 3 * len(os.linesep)))
  1825. def test_readlines(self):
  1826. self._file.write(bytes('foo%sbar%sfred%sbob?????' %
  1827. (os.linesep, os.linesep, os.linesep), 'ascii'))
  1828. self._test_readlines(mailbox._PartialFile(self._file, 0,
  1829. 13 + 3 * len(os.linesep)))
  1830. def test_iteration(self):
  1831. self._file.write(bytes('____foo%sbar%sfred%sbob####' %
  1832. (os.linesep, os.linesep, os.linesep), 'ascii'))
  1833. self._test_iteration(mailbox._PartialFile(self._file, 4,
  1834. 17 + 3 * len(os.linesep)))
  1835. def test_seek_and_tell(self):
  1836. self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
  1837. self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
  1838. 9 + 2 * len(os.linesep)))
  1839. def test_close(self):
  1840. self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
  1841. self._test_close(mailbox._PartialFile(self._file, 1,
  1842. 6 + 3 * len(os.linesep)))
  1843. ## Start: tests from the original module (for backward compatibility).
  1844. FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
  1845. DUMMY_MESSAGE = """\
  1846. From: some.body@dummy.domain
  1847. To: me@my.domain
  1848. Subject: Simple Test
  1849. This is a dummy message.
  1850. """
  1851. class MaildirTestCase(unittest.TestCase):
  1852. def setUp(self):
  1853. # create a new maildir mailbox to work with:
  1854. self._dir = os_helper.TESTFN
  1855. if os.path.isdir(self._dir):
  1856. os_helper.rmtree(self._dir)
  1857. elif os.path.isfile(self._dir):
  1858. os_helper.unlink(self._dir)
  1859. os.mkdir(self._dir)
  1860. os.mkdir(os.path.join(self._dir, "cur"))
  1861. os.mkdir(os.path.join(self._dir, "tmp"))
  1862. os.mkdir(os.path.join(self._dir, "new"))
  1863. self._counter = 1
  1864. self._msgfiles = []
  1865. def tearDown(self):
  1866. list(map(os.unlink, self._msgfiles))
  1867. os_helper.rmdir(os.path.join(self._dir, "cur"))
  1868. os_helper.rmdir(os.path.join(self._dir, "tmp"))
  1869. os_helper.rmdir(os.path.join(self._dir, "new"))
  1870. os_helper.rmdir(self._dir)
  1871. def createMessage(self, dir, mbox=False):
  1872. t = int(time.time() % 1000000)
  1873. pid = self._counter
  1874. self._counter += 1
  1875. filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
  1876. tmpname = os.path.join(self._dir, "tmp", filename)
  1877. newname = os.path.join(self._dir, dir, filename)
  1878. with open(tmpname, "w", encoding="utf-8") as fp:
  1879. self._msgfiles.append(tmpname)
  1880. if mbox:
  1881. fp.write(FROM_)
  1882. fp.write(DUMMY_MESSAGE)
  1883. try:
  1884. os.link(tmpname, newname)
  1885. except (AttributeError, PermissionError):
  1886. with open(newname, "w") as fp:
  1887. fp.write(DUMMY_MESSAGE)
  1888. self._msgfiles.append(newname)
  1889. return tmpname
  1890. def test_empty_maildir(self):
  1891. """Test an empty maildir mailbox"""
  1892. # Test for regression on bug #117490:
  1893. # Make sure the boxes attribute actually gets set.
  1894. self.mbox = mailbox.Maildir(os_helper.TESTFN)
  1895. #self.assertTrue(hasattr(self.mbox, "boxes"))
  1896. #self.assertEqual(len(self.mbox.boxes), 0)
  1897. self.assertIsNone(self.mbox.next())
  1898. self.assertIsNone(self.mbox.next())
  1899. def test_nonempty_maildir_cur(self):
  1900. self.createMessage("cur")
  1901. self.mbox = mailbox.Maildir(os_helper.TESTFN)
  1902. #self.assertEqual(len(self.mbox.boxes), 1)
  1903. self.assertIsNotNone(self.mbox.next())
  1904. self.assertIsNone(self.mbox.next())
  1905. self.assertIsNone(self.mbox.next())
  1906. def test_nonempty_maildir_new(self):
  1907. self.createMessage("new")
  1908. self.mbox = mailbox.Maildir(os_helper.TESTFN)
  1909. #self.assertEqual(len(self.mbox.boxes), 1)
  1910. self.assertIsNotNone(self.mbox.next())
  1911. self.assertIsNone(self.mbox.next())
  1912. self.assertIsNone(self.mbox.next())
  1913. def test_nonempty_maildir_both(self):
  1914. self.createMessage("cur")
  1915. self.createMessage("new")
  1916. self.mbox = mailbox.Maildir(os_helper.TESTFN)
  1917. #self.assertEqual(len(self.mbox.boxes), 2)
  1918. self.assertIsNotNone(self.mbox.next())
  1919. self.assertIsNotNone(self.mbox.next())
  1920. self.assertIsNone(self.mbox.next())
  1921. self.assertIsNone(self.mbox.next())
  1922. ## End: tests from the original module (for backward compatibility).
  1923. _sample_message = """\
  1924. Return-Path: <gkj@gregorykjohnson.com>
  1925. X-Original-To: gkj+person@localhost
  1926. Delivered-To: gkj+person@localhost
  1927. Received: from localhost (localhost [127.0.0.1])
  1928. by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
  1929. for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
  1930. Delivered-To: gkj@sundance.gregorykjohnson.com
  1931. Received: from localhost [127.0.0.1]
  1932. by localhost with POP3 (fetchmail-6.2.5)
  1933. for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
  1934. Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
  1935. by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
  1936. for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
  1937. Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
  1938. id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
  1939. Date: Wed, 13 Jul 2005 17:23:11 -0400
  1940. From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
  1941. To: gkj@gregorykjohnson.com
  1942. Subject: Sample message
  1943. Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
  1944. Mime-Version: 1.0
  1945. Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
  1946. Content-Disposition: inline
  1947. User-Agent: Mutt/1.5.9i
  1948. --NMuMz9nt05w80d4+
  1949. Content-Type: text/plain; charset=us-ascii
  1950. Content-Disposition: inline
  1951. This is a sample message.
  1952. --
  1953. Gregory K. Johnson
  1954. --NMuMz9nt05w80d4+
  1955. Content-Type: application/octet-stream
  1956. Content-Disposition: attachment; filename="text.gz"
  1957. Content-Transfer-Encoding: base64
  1958. H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
  1959. 3FYlAAAA
  1960. --NMuMz9nt05w80d4+--
  1961. """
  1962. _bytes_sample_message = _sample_message.encode('ascii')
  1963. _sample_headers = {
  1964. "Return-Path":"<gkj@gregorykjohnson.com>",
  1965. "X-Original-To":"gkj+person@localhost",
  1966. "Delivered-To":"gkj+person@localhost",
  1967. "Received":"""from localhost (localhost [127.0.0.1])
  1968. by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
  1969. for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
  1970. "Delivered-To":"gkj@sundance.gregorykjohnson.com",
  1971. "Received":"""from localhost [127.0.0.1]
  1972. by localhost with POP3 (fetchmail-6.2.5)
  1973. for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
  1974. "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
  1975. by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
  1976. for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
  1977. "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
  1978. id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
  1979. "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
  1980. "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
  1981. "To":"gkj@gregorykjohnson.com",
  1982. "Subject":"Sample message",
  1983. "Mime-Version":"1.0",
  1984. "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
  1985. "Content-Disposition":"inline",
  1986. "User-Agent": "Mutt/1.5.9i" }
  1987. _sample_payloads = ("""This is a sample message.
  1988. --
  1989. Gregory K. Johnson
  1990. """,
  1991. """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
  1992. 3FYlAAAA
  1993. """)
  1994. class MiscTestCase(unittest.TestCase):
  1995. def test__all__(self):
  1996. support.check__all__(self, mailbox,
  1997. not_exported={"linesep", "fcntl"})
  1998. def tearDownModule():
  1999. support.reap_children()
  2000. if __name__ == '__main__':
  2001. unittest.main()