test_bz2.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014
  1. from test import support
  2. from test.support import bigmemtest, _4G
  3. import array
  4. import unittest
  5. from io import BytesIO, DEFAULT_BUFFER_SIZE
  6. import os
  7. import pickle
  8. import glob
  9. import tempfile
  10. import pathlib
  11. import random
  12. import shutil
  13. import subprocess
  14. import threading
  15. from test.support import import_helper
  16. from test.support import threading_helper
  17. from test.support.os_helper import unlink
  18. import _compression
  19. import sys
  20. # Skip tests if the bz2 module doesn't exist.
  21. bz2 = import_helper.import_module('bz2')
  22. from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
  23. has_cmdline_bunzip2 = None
  24. def ext_decompress(data):
  25. global has_cmdline_bunzip2
  26. if has_cmdline_bunzip2 is None:
  27. has_cmdline_bunzip2 = bool(shutil.which('bunzip2'))
  28. if has_cmdline_bunzip2:
  29. return subprocess.check_output(['bunzip2'], input=data)
  30. else:
  31. return bz2.decompress(data)
  32. class BaseTest(unittest.TestCase):
  33. "Base for other testcases."
  34. TEXT_LINES = [
  35. b'root:x:0:0:root:/root:/bin/bash\n',
  36. b'bin:x:1:1:bin:/bin:\n',
  37. b'daemon:x:2:2:daemon:/sbin:\n',
  38. b'adm:x:3:4:adm:/var/adm:\n',
  39. b'lp:x:4:7:lp:/var/spool/lpd:\n',
  40. b'sync:x:5:0:sync:/sbin:/bin/sync\n',
  41. b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n',
  42. b'halt:x:7:0:halt:/sbin:/sbin/halt\n',
  43. b'mail:x:8:12:mail:/var/spool/mail:\n',
  44. b'news:x:9:13:news:/var/spool/news:\n',
  45. b'uucp:x:10:14:uucp:/var/spool/uucp:\n',
  46. b'operator:x:11:0:operator:/root:\n',
  47. b'games:x:12:100:games:/usr/games:\n',
  48. b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n',
  49. b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n',
  50. b'nobody:x:65534:65534:Nobody:/home:\n',
  51. b'postfix:x:100:101:postfix:/var/spool/postfix:\n',
  52. b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n',
  53. b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n',
  54. b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n',
  55. b'www:x:103:104::/var/www:/bin/false\n',
  56. ]
  57. TEXT = b''.join(TEXT_LINES)
  58. DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
  59. EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
  60. BAD_DATA = b'this is not a valid bzip2 file'
  61. # Some tests need more than one block of uncompressed data. Since one block
  62. # is at least 100,000 bytes, we gather some data dynamically and compress it.
  63. # Note that this assumes that compression works correctly, so we cannot
  64. # simply use the bigger test data for all tests.
  65. test_size = 0
  66. BIG_TEXT = bytearray(128*1024)
  67. for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')):
  68. with open(fname, 'rb') as fh:
  69. test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
  70. if test_size > 128*1024:
  71. break
  72. BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
  73. def setUp(self):
  74. fd, self.filename = tempfile.mkstemp()
  75. os.close(fd)
  76. def tearDown(self):
  77. unlink(self.filename)
  78. class BZ2FileTest(BaseTest):
  79. "Test the BZ2File class."
  80. def createTempFile(self, streams=1, suffix=b""):
  81. with open(self.filename, "wb") as f:
  82. f.write(self.DATA * streams)
  83. f.write(suffix)
  84. def testBadArgs(self):
  85. self.assertRaises(TypeError, BZ2File, 123.456)
  86. self.assertRaises(ValueError, BZ2File, os.devnull, "z")
  87. self.assertRaises(ValueError, BZ2File, os.devnull, "rx")
  88. self.assertRaises(ValueError, BZ2File, os.devnull, "rbt")
  89. self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=0)
  90. self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=10)
  91. # compresslevel is keyword-only
  92. self.assertRaises(TypeError, BZ2File, os.devnull, "r", 3)
  93. def testRead(self):
  94. self.createTempFile()
  95. with BZ2File(self.filename) as bz2f:
  96. self.assertRaises(TypeError, bz2f.read, float())
  97. self.assertEqual(bz2f.read(), self.TEXT)
  98. def testReadBadFile(self):
  99. self.createTempFile(streams=0, suffix=self.BAD_DATA)
  100. with BZ2File(self.filename) as bz2f:
  101. self.assertRaises(OSError, bz2f.read)
  102. def testReadMultiStream(self):
  103. self.createTempFile(streams=5)
  104. with BZ2File(self.filename) as bz2f:
  105. self.assertRaises(TypeError, bz2f.read, float())
  106. self.assertEqual(bz2f.read(), self.TEXT * 5)
  107. def testReadMonkeyMultiStream(self):
  108. # Test BZ2File.read() on a multi-stream archive where a stream
  109. # boundary coincides with the end of the raw read buffer.
  110. buffer_size = _compression.BUFFER_SIZE
  111. _compression.BUFFER_SIZE = len(self.DATA)
  112. try:
  113. self.createTempFile(streams=5)
  114. with BZ2File(self.filename) as bz2f:
  115. self.assertRaises(TypeError, bz2f.read, float())
  116. self.assertEqual(bz2f.read(), self.TEXT * 5)
  117. finally:
  118. _compression.BUFFER_SIZE = buffer_size
  119. def testReadTrailingJunk(self):
  120. self.createTempFile(suffix=self.BAD_DATA)
  121. with BZ2File(self.filename) as bz2f:
  122. self.assertEqual(bz2f.read(), self.TEXT)
  123. def testReadMultiStreamTrailingJunk(self):
  124. self.createTempFile(streams=5, suffix=self.BAD_DATA)
  125. with BZ2File(self.filename) as bz2f:
  126. self.assertEqual(bz2f.read(), self.TEXT * 5)
  127. def testRead0(self):
  128. self.createTempFile()
  129. with BZ2File(self.filename) as bz2f:
  130. self.assertRaises(TypeError, bz2f.read, float())
  131. self.assertEqual(bz2f.read(0), b"")
  132. def testReadChunk10(self):
  133. self.createTempFile()
  134. with BZ2File(self.filename) as bz2f:
  135. text = b''
  136. while True:
  137. str = bz2f.read(10)
  138. if not str:
  139. break
  140. text += str
  141. self.assertEqual(text, self.TEXT)
  142. def testReadChunk10MultiStream(self):
  143. self.createTempFile(streams=5)
  144. with BZ2File(self.filename) as bz2f:
  145. text = b''
  146. while True:
  147. str = bz2f.read(10)
  148. if not str:
  149. break
  150. text += str
  151. self.assertEqual(text, self.TEXT * 5)
  152. def testRead100(self):
  153. self.createTempFile()
  154. with BZ2File(self.filename) as bz2f:
  155. self.assertEqual(bz2f.read(100), self.TEXT[:100])
  156. def testPeek(self):
  157. self.createTempFile()
  158. with BZ2File(self.filename) as bz2f:
  159. pdata = bz2f.peek()
  160. self.assertNotEqual(len(pdata), 0)
  161. self.assertTrue(self.TEXT.startswith(pdata))
  162. self.assertEqual(bz2f.read(), self.TEXT)
  163. def testReadInto(self):
  164. self.createTempFile()
  165. with BZ2File(self.filename) as bz2f:
  166. n = 128
  167. b = bytearray(n)
  168. self.assertEqual(bz2f.readinto(b), n)
  169. self.assertEqual(b, self.TEXT[:n])
  170. n = len(self.TEXT) - n
  171. b = bytearray(len(self.TEXT))
  172. self.assertEqual(bz2f.readinto(b), n)
  173. self.assertEqual(b[:n], self.TEXT[-n:])
  174. def testReadLine(self):
  175. self.createTempFile()
  176. with BZ2File(self.filename) as bz2f:
  177. self.assertRaises(TypeError, bz2f.readline, None)
  178. for line in self.TEXT_LINES:
  179. self.assertEqual(bz2f.readline(), line)
  180. def testReadLineMultiStream(self):
  181. self.createTempFile(streams=5)
  182. with BZ2File(self.filename) as bz2f:
  183. self.assertRaises(TypeError, bz2f.readline, None)
  184. for line in self.TEXT_LINES * 5:
  185. self.assertEqual(bz2f.readline(), line)
  186. def testReadLines(self):
  187. self.createTempFile()
  188. with BZ2File(self.filename) as bz2f:
  189. self.assertRaises(TypeError, bz2f.readlines, None)
  190. self.assertEqual(bz2f.readlines(), self.TEXT_LINES)
  191. def testReadLinesMultiStream(self):
  192. self.createTempFile(streams=5)
  193. with BZ2File(self.filename) as bz2f:
  194. self.assertRaises(TypeError, bz2f.readlines, None)
  195. self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5)
  196. def testIterator(self):
  197. self.createTempFile()
  198. with BZ2File(self.filename) as bz2f:
  199. self.assertEqual(list(iter(bz2f)), self.TEXT_LINES)
  200. def testIteratorMultiStream(self):
  201. self.createTempFile(streams=5)
  202. with BZ2File(self.filename) as bz2f:
  203. self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5)
  204. def testClosedIteratorDeadlock(self):
  205. # Issue #3309: Iteration on a closed BZ2File should release the lock.
  206. self.createTempFile()
  207. bz2f = BZ2File(self.filename)
  208. bz2f.close()
  209. self.assertRaises(ValueError, next, bz2f)
  210. # This call will deadlock if the above call failed to release the lock.
  211. self.assertRaises(ValueError, bz2f.readlines)
  212. def testWrite(self):
  213. with BZ2File(self.filename, "w") as bz2f:
  214. self.assertRaises(TypeError, bz2f.write)
  215. bz2f.write(self.TEXT)
  216. with open(self.filename, 'rb') as f:
  217. self.assertEqual(ext_decompress(f.read()), self.TEXT)
  218. def testWriteChunks10(self):
  219. with BZ2File(self.filename, "w") as bz2f:
  220. n = 0
  221. while True:
  222. str = self.TEXT[n*10:(n+1)*10]
  223. if not str:
  224. break
  225. bz2f.write(str)
  226. n += 1
  227. with open(self.filename, 'rb') as f:
  228. self.assertEqual(ext_decompress(f.read()), self.TEXT)
  229. def testWriteNonDefaultCompressLevel(self):
  230. expected = bz2.compress(self.TEXT, compresslevel=5)
  231. with BZ2File(self.filename, "w", compresslevel=5) as bz2f:
  232. bz2f.write(self.TEXT)
  233. with open(self.filename, "rb") as f:
  234. self.assertEqual(f.read(), expected)
  235. def testWriteLines(self):
  236. with BZ2File(self.filename, "w") as bz2f:
  237. self.assertRaises(TypeError, bz2f.writelines)
  238. bz2f.writelines(self.TEXT_LINES)
  239. # Issue #1535500: Calling writelines() on a closed BZ2File
  240. # should raise an exception.
  241. self.assertRaises(ValueError, bz2f.writelines, ["a"])
  242. with open(self.filename, 'rb') as f:
  243. self.assertEqual(ext_decompress(f.read()), self.TEXT)
  244. def testWriteMethodsOnReadOnlyFile(self):
  245. with BZ2File(self.filename, "w") as bz2f:
  246. bz2f.write(b"abc")
  247. with BZ2File(self.filename, "r") as bz2f:
  248. self.assertRaises(OSError, bz2f.write, b"a")
  249. self.assertRaises(OSError, bz2f.writelines, [b"a"])
  250. def testAppend(self):
  251. with BZ2File(self.filename, "w") as bz2f:
  252. self.assertRaises(TypeError, bz2f.write)
  253. bz2f.write(self.TEXT)
  254. with BZ2File(self.filename, "a") as bz2f:
  255. self.assertRaises(TypeError, bz2f.write)
  256. bz2f.write(self.TEXT)
  257. with open(self.filename, 'rb') as f:
  258. self.assertEqual(ext_decompress(f.read()), self.TEXT * 2)
  259. def testSeekForward(self):
  260. self.createTempFile()
  261. with BZ2File(self.filename) as bz2f:
  262. self.assertRaises(TypeError, bz2f.seek)
  263. bz2f.seek(150)
  264. self.assertEqual(bz2f.read(), self.TEXT[150:])
  265. def testSeekForwardAcrossStreams(self):
  266. self.createTempFile(streams=2)
  267. with BZ2File(self.filename) as bz2f:
  268. self.assertRaises(TypeError, bz2f.seek)
  269. bz2f.seek(len(self.TEXT) + 150)
  270. self.assertEqual(bz2f.read(), self.TEXT[150:])
  271. def testSeekBackwards(self):
  272. self.createTempFile()
  273. with BZ2File(self.filename) as bz2f:
  274. bz2f.read(500)
  275. bz2f.seek(-150, 1)
  276. self.assertEqual(bz2f.read(), self.TEXT[500-150:])
  277. def testSeekBackwardsAcrossStreams(self):
  278. self.createTempFile(streams=2)
  279. with BZ2File(self.filename) as bz2f:
  280. readto = len(self.TEXT) + 100
  281. while readto > 0:
  282. readto -= len(bz2f.read(readto))
  283. bz2f.seek(-150, 1)
  284. self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT)
  285. def testSeekBackwardsFromEnd(self):
  286. self.createTempFile()
  287. with BZ2File(self.filename) as bz2f:
  288. bz2f.seek(-150, 2)
  289. self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
  290. def testSeekBackwardsFromEndAcrossStreams(self):
  291. self.createTempFile(streams=2)
  292. with BZ2File(self.filename) as bz2f:
  293. bz2f.seek(-1000, 2)
  294. self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:])
  295. def testSeekPostEnd(self):
  296. self.createTempFile()
  297. with BZ2File(self.filename) as bz2f:
  298. bz2f.seek(150000)
  299. self.assertEqual(bz2f.tell(), len(self.TEXT))
  300. self.assertEqual(bz2f.read(), b"")
  301. def testSeekPostEndMultiStream(self):
  302. self.createTempFile(streams=5)
  303. with BZ2File(self.filename) as bz2f:
  304. bz2f.seek(150000)
  305. self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
  306. self.assertEqual(bz2f.read(), b"")
  307. def testSeekPostEndTwice(self):
  308. self.createTempFile()
  309. with BZ2File(self.filename) as bz2f:
  310. bz2f.seek(150000)
  311. bz2f.seek(150000)
  312. self.assertEqual(bz2f.tell(), len(self.TEXT))
  313. self.assertEqual(bz2f.read(), b"")
  314. def testSeekPostEndTwiceMultiStream(self):
  315. self.createTempFile(streams=5)
  316. with BZ2File(self.filename) as bz2f:
  317. bz2f.seek(150000)
  318. bz2f.seek(150000)
  319. self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
  320. self.assertEqual(bz2f.read(), b"")
  321. def testSeekPreStart(self):
  322. self.createTempFile()
  323. with BZ2File(self.filename) as bz2f:
  324. bz2f.seek(-150)
  325. self.assertEqual(bz2f.tell(), 0)
  326. self.assertEqual(bz2f.read(), self.TEXT)
  327. def testSeekPreStartMultiStream(self):
  328. self.createTempFile(streams=2)
  329. with BZ2File(self.filename) as bz2f:
  330. bz2f.seek(-150)
  331. self.assertEqual(bz2f.tell(), 0)
  332. self.assertEqual(bz2f.read(), self.TEXT * 2)
  333. def testFileno(self):
  334. self.createTempFile()
  335. with open(self.filename, 'rb') as rawf:
  336. bz2f = BZ2File(rawf)
  337. try:
  338. self.assertEqual(bz2f.fileno(), rawf.fileno())
  339. finally:
  340. bz2f.close()
  341. self.assertRaises(ValueError, bz2f.fileno)
  342. def testSeekable(self):
  343. bz2f = BZ2File(BytesIO(self.DATA))
  344. try:
  345. self.assertTrue(bz2f.seekable())
  346. bz2f.read()
  347. self.assertTrue(bz2f.seekable())
  348. finally:
  349. bz2f.close()
  350. self.assertRaises(ValueError, bz2f.seekable)
  351. bz2f = BZ2File(BytesIO(), "w")
  352. try:
  353. self.assertFalse(bz2f.seekable())
  354. finally:
  355. bz2f.close()
  356. self.assertRaises(ValueError, bz2f.seekable)
  357. src = BytesIO(self.DATA)
  358. src.seekable = lambda: False
  359. bz2f = BZ2File(src)
  360. try:
  361. self.assertFalse(bz2f.seekable())
  362. finally:
  363. bz2f.close()
  364. self.assertRaises(ValueError, bz2f.seekable)
  365. def testReadable(self):
  366. bz2f = BZ2File(BytesIO(self.DATA))
  367. try:
  368. self.assertTrue(bz2f.readable())
  369. bz2f.read()
  370. self.assertTrue(bz2f.readable())
  371. finally:
  372. bz2f.close()
  373. self.assertRaises(ValueError, bz2f.readable)
  374. bz2f = BZ2File(BytesIO(), "w")
  375. try:
  376. self.assertFalse(bz2f.readable())
  377. finally:
  378. bz2f.close()
  379. self.assertRaises(ValueError, bz2f.readable)
  380. def testWritable(self):
  381. bz2f = BZ2File(BytesIO(self.DATA))
  382. try:
  383. self.assertFalse(bz2f.writable())
  384. bz2f.read()
  385. self.assertFalse(bz2f.writable())
  386. finally:
  387. bz2f.close()
  388. self.assertRaises(ValueError, bz2f.writable)
  389. bz2f = BZ2File(BytesIO(), "w")
  390. try:
  391. self.assertTrue(bz2f.writable())
  392. finally:
  393. bz2f.close()
  394. self.assertRaises(ValueError, bz2f.writable)
  395. def testOpenDel(self):
  396. self.createTempFile()
  397. for i in range(10000):
  398. o = BZ2File(self.filename)
  399. del o
  400. def testOpenNonexistent(self):
  401. self.assertRaises(OSError, BZ2File, "/non/existent")
  402. def testReadlinesNoNewline(self):
  403. # Issue #1191043: readlines() fails on a file containing no newline.
  404. data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
  405. with open(self.filename, "wb") as f:
  406. f.write(data)
  407. with BZ2File(self.filename) as bz2f:
  408. lines = bz2f.readlines()
  409. self.assertEqual(lines, [b'Test'])
  410. with BZ2File(self.filename) as bz2f:
  411. xlines = list(bz2f.readlines())
  412. self.assertEqual(xlines, [b'Test'])
  413. def testContextProtocol(self):
  414. f = None
  415. with BZ2File(self.filename, "wb") as f:
  416. f.write(b"xxx")
  417. f = BZ2File(self.filename, "rb")
  418. f.close()
  419. try:
  420. with f:
  421. pass
  422. except ValueError:
  423. pass
  424. else:
  425. self.fail("__enter__ on a closed file didn't raise an exception")
  426. try:
  427. with BZ2File(self.filename, "wb") as f:
  428. 1/0
  429. except ZeroDivisionError:
  430. pass
  431. else:
  432. self.fail("1/0 didn't raise an exception")
  433. @threading_helper.requires_working_threading()
  434. def testThreading(self):
  435. # Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
  436. data = b"1" * 2**20
  437. nthreads = 10
  438. with BZ2File(self.filename, 'wb') as f:
  439. def comp():
  440. for i in range(5):
  441. f.write(data)
  442. threads = [threading.Thread(target=comp) for i in range(nthreads)]
  443. with threading_helper.start_threads(threads):
  444. pass
  445. def testMixedIterationAndReads(self):
  446. self.createTempFile()
  447. linelen = len(self.TEXT_LINES[0])
  448. halflen = linelen // 2
  449. with BZ2File(self.filename) as bz2f:
  450. bz2f.read(halflen)
  451. self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
  452. self.assertEqual(bz2f.read(), self.TEXT[linelen:])
  453. with BZ2File(self.filename) as bz2f:
  454. bz2f.readline()
  455. self.assertEqual(next(bz2f), self.TEXT_LINES[1])
  456. self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
  457. with BZ2File(self.filename) as bz2f:
  458. bz2f.readlines()
  459. self.assertRaises(StopIteration, next, bz2f)
  460. self.assertEqual(bz2f.readlines(), [])
  461. def testMultiStreamOrdering(self):
  462. # Test the ordering of streams when reading a multi-stream archive.
  463. data1 = b"foo" * 1000
  464. data2 = b"bar" * 1000
  465. with BZ2File(self.filename, "w") as bz2f:
  466. bz2f.write(data1)
  467. with BZ2File(self.filename, "a") as bz2f:
  468. bz2f.write(data2)
  469. with BZ2File(self.filename) as bz2f:
  470. self.assertEqual(bz2f.read(), data1 + data2)
  471. def testOpenBytesFilename(self):
  472. str_filename = self.filename
  473. try:
  474. bytes_filename = str_filename.encode("ascii")
  475. except UnicodeEncodeError:
  476. self.skipTest("Temporary file name needs to be ASCII")
  477. with BZ2File(bytes_filename, "wb") as f:
  478. f.write(self.DATA)
  479. with BZ2File(bytes_filename, "rb") as f:
  480. self.assertEqual(f.read(), self.DATA)
  481. # Sanity check that we are actually operating on the right file.
  482. with BZ2File(str_filename, "rb") as f:
  483. self.assertEqual(f.read(), self.DATA)
  484. def testOpenPathLikeFilename(self):
  485. filename = pathlib.Path(self.filename)
  486. with BZ2File(filename, "wb") as f:
  487. f.write(self.DATA)
  488. with BZ2File(filename, "rb") as f:
  489. self.assertEqual(f.read(), self.DATA)
  490. def testDecompressLimited(self):
  491. """Decompressed data buffering should be limited"""
  492. bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9)
  493. self.assertLess(len(bomb), _compression.BUFFER_SIZE)
  494. decomp = BZ2File(BytesIO(bomb))
  495. self.assertEqual(decomp.read(1), b'\0')
  496. max_decomp = 1 + DEFAULT_BUFFER_SIZE
  497. self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
  498. "Excessive amount of data was decompressed")
  499. # Tests for a BZ2File wrapping another file object:
  500. def testReadBytesIO(self):
  501. with BytesIO(self.DATA) as bio:
  502. with BZ2File(bio) as bz2f:
  503. self.assertRaises(TypeError, bz2f.read, float())
  504. self.assertEqual(bz2f.read(), self.TEXT)
  505. self.assertFalse(bio.closed)
  506. def testPeekBytesIO(self):
  507. with BytesIO(self.DATA) as bio:
  508. with BZ2File(bio) as bz2f:
  509. pdata = bz2f.peek()
  510. self.assertNotEqual(len(pdata), 0)
  511. self.assertTrue(self.TEXT.startswith(pdata))
  512. self.assertEqual(bz2f.read(), self.TEXT)
  513. def testWriteBytesIO(self):
  514. with BytesIO() as bio:
  515. with BZ2File(bio, "w") as bz2f:
  516. self.assertRaises(TypeError, bz2f.write)
  517. bz2f.write(self.TEXT)
  518. self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
  519. self.assertFalse(bio.closed)
  520. def testSeekForwardBytesIO(self):
  521. with BytesIO(self.DATA) as bio:
  522. with BZ2File(bio) as bz2f:
  523. self.assertRaises(TypeError, bz2f.seek)
  524. bz2f.seek(150)
  525. self.assertEqual(bz2f.read(), self.TEXT[150:])
  526. def testSeekBackwardsBytesIO(self):
  527. with BytesIO(self.DATA) as bio:
  528. with BZ2File(bio) as bz2f:
  529. bz2f.read(500)
  530. bz2f.seek(-150, 1)
  531. self.assertEqual(bz2f.read(), self.TEXT[500-150:])
  532. def test_read_truncated(self):
  533. # Drop the eos_magic field (6 bytes) and CRC (4 bytes).
  534. truncated = self.DATA[:-10]
  535. with BZ2File(BytesIO(truncated)) as f:
  536. self.assertRaises(EOFError, f.read)
  537. with BZ2File(BytesIO(truncated)) as f:
  538. self.assertEqual(f.read(len(self.TEXT)), self.TEXT)
  539. self.assertRaises(EOFError, f.read, 1)
  540. # Incomplete 4-byte file header, and block header of at least 146 bits.
  541. for i in range(22):
  542. with BZ2File(BytesIO(truncated[:i])) as f:
  543. self.assertRaises(EOFError, f.read, 1)
  544. def test_issue44439(self):
  545. q = array.array('Q', [1, 2, 3, 4, 5])
  546. LENGTH = len(q) * q.itemsize
  547. with BZ2File(BytesIO(), 'w') as f:
  548. self.assertEqual(f.write(q), LENGTH)
  549. self.assertEqual(f.tell(), LENGTH)
  550. class BZ2CompressorTest(BaseTest):
  551. def testCompress(self):
  552. bz2c = BZ2Compressor()
  553. self.assertRaises(TypeError, bz2c.compress)
  554. data = bz2c.compress(self.TEXT)
  555. data += bz2c.flush()
  556. self.assertEqual(ext_decompress(data), self.TEXT)
  557. def testCompressEmptyString(self):
  558. bz2c = BZ2Compressor()
  559. data = bz2c.compress(b'')
  560. data += bz2c.flush()
  561. self.assertEqual(data, self.EMPTY_DATA)
  562. def testCompressChunks10(self):
  563. bz2c = BZ2Compressor()
  564. n = 0
  565. data = b''
  566. while True:
  567. str = self.TEXT[n*10:(n+1)*10]
  568. if not str:
  569. break
  570. data += bz2c.compress(str)
  571. n += 1
  572. data += bz2c.flush()
  573. self.assertEqual(ext_decompress(data), self.TEXT)
  574. @support.skip_if_pgo_task
  575. @bigmemtest(size=_4G + 100, memuse=2)
  576. def testCompress4G(self, size):
  577. # "Test BZ2Compressor.compress()/flush() with >4GiB input"
  578. bz2c = BZ2Compressor()
  579. data = b"x" * size
  580. try:
  581. compressed = bz2c.compress(data)
  582. compressed += bz2c.flush()
  583. finally:
  584. data = None # Release memory
  585. data = bz2.decompress(compressed)
  586. try:
  587. self.assertEqual(len(data), size)
  588. self.assertEqual(len(data.strip(b"x")), 0)
  589. finally:
  590. data = None
  591. def testPickle(self):
  592. for proto in range(pickle.HIGHEST_PROTOCOL + 1):
  593. with self.assertRaises(TypeError):
  594. pickle.dumps(BZ2Compressor(), proto)
  595. class BZ2DecompressorTest(BaseTest):
  596. def test_Constructor(self):
  597. self.assertRaises(TypeError, BZ2Decompressor, 42)
  598. def testDecompress(self):
  599. bz2d = BZ2Decompressor()
  600. self.assertRaises(TypeError, bz2d.decompress)
  601. text = bz2d.decompress(self.DATA)
  602. self.assertEqual(text, self.TEXT)
  603. def testDecompressChunks10(self):
  604. bz2d = BZ2Decompressor()
  605. text = b''
  606. n = 0
  607. while True:
  608. str = self.DATA[n*10:(n+1)*10]
  609. if not str:
  610. break
  611. text += bz2d.decompress(str)
  612. n += 1
  613. self.assertEqual(text, self.TEXT)
  614. def testDecompressUnusedData(self):
  615. bz2d = BZ2Decompressor()
  616. unused_data = b"this is unused data"
  617. text = bz2d.decompress(self.DATA+unused_data)
  618. self.assertEqual(text, self.TEXT)
  619. self.assertEqual(bz2d.unused_data, unused_data)
  620. def testEOFError(self):
  621. bz2d = BZ2Decompressor()
  622. text = bz2d.decompress(self.DATA)
  623. self.assertRaises(EOFError, bz2d.decompress, b"anything")
  624. self.assertRaises(EOFError, bz2d.decompress, b"")
  625. @support.skip_if_pgo_task
  626. @bigmemtest(size=_4G + 100, memuse=3.3)
  627. def testDecompress4G(self, size):
  628. # "Test BZ2Decompressor.decompress() with >4GiB input"
  629. blocksize = 10 * 1024 * 1024
  630. block = random.randbytes(blocksize)
  631. try:
  632. data = block * (size // blocksize + 1)
  633. compressed = bz2.compress(data)
  634. bz2d = BZ2Decompressor()
  635. decompressed = bz2d.decompress(compressed)
  636. self.assertTrue(decompressed == data)
  637. finally:
  638. data = None
  639. compressed = None
  640. decompressed = None
  641. def testPickle(self):
  642. for proto in range(pickle.HIGHEST_PROTOCOL + 1):
  643. with self.assertRaises(TypeError):
  644. pickle.dumps(BZ2Decompressor(), proto)
  645. def testDecompressorChunksMaxsize(self):
  646. bzd = BZ2Decompressor()
  647. max_length = 100
  648. out = []
  649. # Feed some input
  650. len_ = len(self.BIG_DATA) - 64
  651. out.append(bzd.decompress(self.BIG_DATA[:len_],
  652. max_length=max_length))
  653. self.assertFalse(bzd.needs_input)
  654. self.assertEqual(len(out[-1]), max_length)
  655. # Retrieve more data without providing more input
  656. out.append(bzd.decompress(b'', max_length=max_length))
  657. self.assertFalse(bzd.needs_input)
  658. self.assertEqual(len(out[-1]), max_length)
  659. # Retrieve more data while providing more input
  660. out.append(bzd.decompress(self.BIG_DATA[len_:],
  661. max_length=max_length))
  662. self.assertLessEqual(len(out[-1]), max_length)
  663. # Retrieve remaining uncompressed data
  664. while not bzd.eof:
  665. out.append(bzd.decompress(b'', max_length=max_length))
  666. self.assertLessEqual(len(out[-1]), max_length)
  667. out = b"".join(out)
  668. self.assertEqual(out, self.BIG_TEXT)
  669. self.assertEqual(bzd.unused_data, b"")
  670. def test_decompressor_inputbuf_1(self):
  671. # Test reusing input buffer after moving existing
  672. # contents to beginning
  673. bzd = BZ2Decompressor()
  674. out = []
  675. # Create input buffer and fill it
  676. self.assertEqual(bzd.decompress(self.DATA[:100],
  677. max_length=0), b'')
  678. # Retrieve some results, freeing capacity at beginning
  679. # of input buffer
  680. out.append(bzd.decompress(b'', 2))
  681. # Add more data that fits into input buffer after
  682. # moving existing data to beginning
  683. out.append(bzd.decompress(self.DATA[100:105], 15))
  684. # Decompress rest of data
  685. out.append(bzd.decompress(self.DATA[105:]))
  686. self.assertEqual(b''.join(out), self.TEXT)
  687. def test_decompressor_inputbuf_2(self):
  688. # Test reusing input buffer by appending data at the
  689. # end right away
  690. bzd = BZ2Decompressor()
  691. out = []
  692. # Create input buffer and empty it
  693. self.assertEqual(bzd.decompress(self.DATA[:200],
  694. max_length=0), b'')
  695. out.append(bzd.decompress(b''))
  696. # Fill buffer with new data
  697. out.append(bzd.decompress(self.DATA[200:280], 2))
  698. # Append some more data, not enough to require resize
  699. out.append(bzd.decompress(self.DATA[280:300], 2))
  700. # Decompress rest of data
  701. out.append(bzd.decompress(self.DATA[300:]))
  702. self.assertEqual(b''.join(out), self.TEXT)
  703. def test_decompressor_inputbuf_3(self):
  704. # Test reusing input buffer after extending it
  705. bzd = BZ2Decompressor()
  706. out = []
  707. # Create almost full input buffer
  708. out.append(bzd.decompress(self.DATA[:200], 5))
  709. # Add even more data to it, requiring resize
  710. out.append(bzd.decompress(self.DATA[200:300], 5))
  711. # Decompress rest of data
  712. out.append(bzd.decompress(self.DATA[300:]))
  713. self.assertEqual(b''.join(out), self.TEXT)
  714. def test_failure(self):
  715. bzd = BZ2Decompressor()
  716. self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30)
  717. # Previously, a second call could crash due to internal inconsistency
  718. self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30)
  719. @support.refcount_test
  720. def test_refleaks_in___init__(self):
  721. gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
  722. bzd = BZ2Decompressor()
  723. refs_before = gettotalrefcount()
  724. for i in range(100):
  725. bzd.__init__()
  726. self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
  727. class CompressDecompressTest(BaseTest):
  728. def testCompress(self):
  729. data = bz2.compress(self.TEXT)
  730. self.assertEqual(ext_decompress(data), self.TEXT)
  731. def testCompressEmptyString(self):
  732. text = bz2.compress(b'')
  733. self.assertEqual(text, self.EMPTY_DATA)
  734. def testDecompress(self):
  735. text = bz2.decompress(self.DATA)
  736. self.assertEqual(text, self.TEXT)
  737. def testDecompressEmpty(self):
  738. text = bz2.decompress(b"")
  739. self.assertEqual(text, b"")
  740. def testDecompressToEmptyString(self):
  741. text = bz2.decompress(self.EMPTY_DATA)
  742. self.assertEqual(text, b'')
  743. def testDecompressIncomplete(self):
  744. self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10])
  745. def testDecompressBadData(self):
  746. self.assertRaises(OSError, bz2.decompress, self.BAD_DATA)
  747. def testDecompressMultiStream(self):
  748. text = bz2.decompress(self.DATA * 5)
  749. self.assertEqual(text, self.TEXT * 5)
  750. def testDecompressTrailingJunk(self):
  751. text = bz2.decompress(self.DATA + self.BAD_DATA)
  752. self.assertEqual(text, self.TEXT)
  753. def testDecompressMultiStreamTrailingJunk(self):
  754. text = bz2.decompress(self.DATA * 5 + self.BAD_DATA)
  755. self.assertEqual(text, self.TEXT * 5)
  756. class OpenTest(BaseTest):
  757. "Test the open function."
  758. def open(self, *args, **kwargs):
  759. return bz2.open(*args, **kwargs)
  760. def test_binary_modes(self):
  761. for mode in ("wb", "xb"):
  762. if mode == "xb":
  763. unlink(self.filename)
  764. with self.open(self.filename, mode) as f:
  765. f.write(self.TEXT)
  766. with open(self.filename, "rb") as f:
  767. file_data = ext_decompress(f.read())
  768. self.assertEqual(file_data, self.TEXT)
  769. with self.open(self.filename, "rb") as f:
  770. self.assertEqual(f.read(), self.TEXT)
  771. with self.open(self.filename, "ab") as f:
  772. f.write(self.TEXT)
  773. with open(self.filename, "rb") as f:
  774. file_data = ext_decompress(f.read())
  775. self.assertEqual(file_data, self.TEXT * 2)
  776. def test_implicit_binary_modes(self):
  777. # Test implicit binary modes (no "b" or "t" in mode string).
  778. for mode in ("w", "x"):
  779. if mode == "x":
  780. unlink(self.filename)
  781. with self.open(self.filename, mode) as f:
  782. f.write(self.TEXT)
  783. with open(self.filename, "rb") as f:
  784. file_data = ext_decompress(f.read())
  785. self.assertEqual(file_data, self.TEXT)
  786. with self.open(self.filename, "r") as f:
  787. self.assertEqual(f.read(), self.TEXT)
  788. with self.open(self.filename, "a") as f:
  789. f.write(self.TEXT)
  790. with open(self.filename, "rb") as f:
  791. file_data = ext_decompress(f.read())
  792. self.assertEqual(file_data, self.TEXT * 2)
  793. def test_text_modes(self):
  794. text = self.TEXT.decode("ascii")
  795. text_native_eol = text.replace("\n", os.linesep)
  796. for mode in ("wt", "xt"):
  797. if mode == "xt":
  798. unlink(self.filename)
  799. with self.open(self.filename, mode, encoding="ascii") as f:
  800. f.write(text)
  801. with open(self.filename, "rb") as f:
  802. file_data = ext_decompress(f.read()).decode("ascii")
  803. self.assertEqual(file_data, text_native_eol)
  804. with self.open(self.filename, "rt", encoding="ascii") as f:
  805. self.assertEqual(f.read(), text)
  806. with self.open(self.filename, "at", encoding="ascii") as f:
  807. f.write(text)
  808. with open(self.filename, "rb") as f:
  809. file_data = ext_decompress(f.read()).decode("ascii")
  810. self.assertEqual(file_data, text_native_eol * 2)
  811. def test_x_mode(self):
  812. for mode in ("x", "xb", "xt"):
  813. unlink(self.filename)
  814. encoding = "utf-8" if "t" in mode else None
  815. with self.open(self.filename, mode, encoding=encoding) as f:
  816. pass
  817. with self.assertRaises(FileExistsError):
  818. with self.open(self.filename, mode) as f:
  819. pass
  820. def test_fileobj(self):
  821. with self.open(BytesIO(self.DATA), "r") as f:
  822. self.assertEqual(f.read(), self.TEXT)
  823. with self.open(BytesIO(self.DATA), "rb") as f:
  824. self.assertEqual(f.read(), self.TEXT)
  825. text = self.TEXT.decode("ascii")
  826. with self.open(BytesIO(self.DATA), "rt", encoding="utf-8") as f:
  827. self.assertEqual(f.read(), text)
  828. def test_bad_params(self):
  829. # Test invalid parameter combinations.
  830. self.assertRaises(ValueError,
  831. self.open, self.filename, "wbt")
  832. self.assertRaises(ValueError,
  833. self.open, self.filename, "xbt")
  834. self.assertRaises(ValueError,
  835. self.open, self.filename, "rb", encoding="utf-8")
  836. self.assertRaises(ValueError,
  837. self.open, self.filename, "rb", errors="ignore")
  838. self.assertRaises(ValueError,
  839. self.open, self.filename, "rb", newline="\n")
  840. def test_encoding(self):
  841. # Test non-default encoding.
  842. text = self.TEXT.decode("ascii")
  843. text_native_eol = text.replace("\n", os.linesep)
  844. with self.open(self.filename, "wt", encoding="utf-16-le") as f:
  845. f.write(text)
  846. with open(self.filename, "rb") as f:
  847. file_data = ext_decompress(f.read()).decode("utf-16-le")
  848. self.assertEqual(file_data, text_native_eol)
  849. with self.open(self.filename, "rt", encoding="utf-16-le") as f:
  850. self.assertEqual(f.read(), text)
  851. def test_encoding_error_handler(self):
  852. # Test with non-default encoding error handler.
  853. with self.open(self.filename, "wb") as f:
  854. f.write(b"foo\xffbar")
  855. with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
  856. as f:
  857. self.assertEqual(f.read(), "foobar")
  858. def test_newline(self):
  859. # Test with explicit newline (universal newline mode disabled).
  860. text = self.TEXT.decode("ascii")
  861. with self.open(self.filename, "wt", encoding="utf-8", newline="\n") as f:
  862. f.write(text)
  863. with self.open(self.filename, "rt", encoding="utf-8", newline="\r") as f:
  864. self.assertEqual(f.readlines(), [text])
  865. def tearDownModule():
  866. support.reap_children()
  867. if __name__ == '__main__':
  868. unittest.main()