test_file.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. import sys
  2. import os
  3. import unittest
  4. from array import array
  5. from weakref import proxy
  6. import io
  7. import _pyio as pyio
  8. from test.support import gc_collect
  9. from test.support.os_helper import TESTFN
  10. from test.support import os_helper
  11. from test.support import warnings_helper
  12. from collections import UserList
  13. class AutoFileTests:
  14. # file tests for which a test file is automatically set up
  15. def setUp(self):
  16. self.f = self.open(TESTFN, 'wb')
  17. def tearDown(self):
  18. if self.f:
  19. self.f.close()
  20. os_helper.unlink(TESTFN)
  21. def testWeakRefs(self):
  22. # verify weak references
  23. p = proxy(self.f)
  24. p.write(b'teststring')
  25. self.assertEqual(self.f.tell(), p.tell())
  26. self.f.close()
  27. self.f = None
  28. gc_collect() # For PyPy or other GCs.
  29. self.assertRaises(ReferenceError, getattr, p, 'tell')
  30. def testAttributes(self):
  31. # verify expected attributes exist
  32. f = self.f
  33. f.name # merely shouldn't blow up
  34. f.mode # ditto
  35. f.closed # ditto
  36. def testReadinto(self):
  37. # verify readinto
  38. self.f.write(b'12')
  39. self.f.close()
  40. a = array('b', b'x'*10)
  41. self.f = self.open(TESTFN, 'rb')
  42. n = self.f.readinto(a)
  43. self.assertEqual(b'12', a.tobytes()[:n])
  44. def testReadinto_text(self):
  45. # verify readinto refuses text files
  46. a = array('b', b'x'*10)
  47. self.f.close()
  48. self.f = self.open(TESTFN, encoding="utf-8")
  49. if hasattr(self.f, "readinto"):
  50. self.assertRaises(TypeError, self.f.readinto, a)
  51. def testWritelinesUserList(self):
  52. # verify writelines with instance sequence
  53. l = UserList([b'1', b'2'])
  54. self.f.writelines(l)
  55. self.f.close()
  56. self.f = self.open(TESTFN, 'rb')
  57. buf = self.f.read()
  58. self.assertEqual(buf, b'12')
  59. def testWritelinesIntegers(self):
  60. # verify writelines with integers
  61. self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
  62. def testWritelinesIntegersUserList(self):
  63. # verify writelines with integers in UserList
  64. l = UserList([1,2,3])
  65. self.assertRaises(TypeError, self.f.writelines, l)
  66. def testWritelinesNonString(self):
  67. # verify writelines with non-string object
  68. class NonString:
  69. pass
  70. self.assertRaises(TypeError, self.f.writelines,
  71. [NonString(), NonString()])
  72. def testErrors(self):
  73. f = self.f
  74. self.assertEqual(f.name, TESTFN)
  75. self.assertFalse(f.isatty())
  76. self.assertFalse(f.closed)
  77. if hasattr(f, "readinto"):
  78. self.assertRaises((OSError, TypeError), f.readinto, "")
  79. f.close()
  80. self.assertTrue(f.closed)
  81. def testMethods(self):
  82. methods = [('fileno', ()),
  83. ('flush', ()),
  84. ('isatty', ()),
  85. ('__next__', ()),
  86. ('read', ()),
  87. ('write', (b"",)),
  88. ('readline', ()),
  89. ('readlines', ()),
  90. ('seek', (0,)),
  91. ('tell', ()),
  92. ('write', (b"",)),
  93. ('writelines', ([],)),
  94. ('__iter__', ()),
  95. ]
  96. methods.append(('truncate', ()))
  97. # __exit__ should close the file
  98. self.f.__exit__(None, None, None)
  99. self.assertTrue(self.f.closed)
  100. for methodname, args in methods:
  101. method = getattr(self.f, methodname)
  102. # should raise on closed file
  103. self.assertRaises(ValueError, method, *args)
  104. # file is closed, __exit__ shouldn't do anything
  105. self.assertEqual(self.f.__exit__(None, None, None), None)
  106. # it must also return None if an exception was given
  107. try:
  108. 1/0
  109. except:
  110. self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
  111. def testReadWhenWriting(self):
  112. self.assertRaises(OSError, self.f.read)
  113. class CAutoFileTests(AutoFileTests, unittest.TestCase):
  114. open = io.open
  115. class PyAutoFileTests(AutoFileTests, unittest.TestCase):
  116. open = staticmethod(pyio.open)
  117. class OtherFileTests:
  118. def tearDown(self):
  119. os_helper.unlink(TESTFN)
  120. def testModeStrings(self):
  121. # check invalid mode strings
  122. self.open(TESTFN, 'wb').close()
  123. for mode in ("", "aU", "wU+", "U+", "+U", "rU+"):
  124. try:
  125. f = self.open(TESTFN, mode)
  126. except ValueError:
  127. pass
  128. else:
  129. f.close()
  130. self.fail('%r is an invalid file mode' % mode)
  131. def testStdin(self):
  132. if sys.platform == 'osf1V5':
  133. # This causes the interpreter to exit on OSF1 v5.1.
  134. self.skipTest(
  135. ' sys.stdin.seek(-1) may crash the interpreter on OSF1.'
  136. ' Test manually.')
  137. if not sys.stdin.isatty():
  138. # Issue 14853: stdin becomes seekable when redirected to a file
  139. self.skipTest('stdin must be a TTY in this test')
  140. with self.assertRaises((IOError, ValueError)):
  141. sys.stdin.seek(-1)
  142. with self.assertRaises((IOError, ValueError)):
  143. sys.stdin.truncate()
  144. def testBadModeArgument(self):
  145. # verify that we get a sensible error message for bad mode argument
  146. bad_mode = "qwerty"
  147. try:
  148. f = self.open(TESTFN, bad_mode)
  149. except ValueError as msg:
  150. if msg.args[0] != 0:
  151. s = str(msg)
  152. if TESTFN in s or bad_mode not in s:
  153. self.fail("bad error message for invalid mode: %s" % s)
  154. # if msg.args[0] == 0, we're probably on Windows where there may be
  155. # no obvious way to discover why open() failed.
  156. else:
  157. f.close()
  158. self.fail("no error for invalid mode: %s" % bad_mode)
  159. def _checkBufferSize(self, s):
  160. try:
  161. f = self.open(TESTFN, 'wb', s)
  162. f.write(str(s).encode("ascii"))
  163. f.close()
  164. f.close()
  165. f = self.open(TESTFN, 'rb', s)
  166. d = int(f.read().decode("ascii"))
  167. f.close()
  168. f.close()
  169. except OSError as msg:
  170. self.fail('error setting buffer size %d: %s' % (s, str(msg)))
  171. self.assertEqual(d, s)
  172. def testSetBufferSize(self):
  173. # make sure that explicitly setting the buffer size doesn't cause
  174. # misbehaviour especially with repeated close() calls
  175. for s in (-1, 0, 512):
  176. with warnings_helper.check_no_warnings(self,
  177. message='line buffering',
  178. category=RuntimeWarning):
  179. self._checkBufferSize(s)
  180. # test that attempts to use line buffering in binary mode cause
  181. # a warning
  182. with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
  183. self._checkBufferSize(1)
  184. def testTruncateOnWindows(self):
  185. # SF bug <http://www.python.org/sf/801631>
  186. # "file.truncate fault on windows"
  187. f = self.open(TESTFN, 'wb')
  188. try:
  189. f.write(b'12345678901') # 11 bytes
  190. f.close()
  191. f = self.open(TESTFN,'rb+')
  192. data = f.read(5)
  193. if data != b'12345':
  194. self.fail("Read on file opened for update failed %r" % data)
  195. if f.tell() != 5:
  196. self.fail("File pos after read wrong %d" % f.tell())
  197. f.truncate()
  198. if f.tell() != 5:
  199. self.fail("File pos after ftruncate wrong %d" % f.tell())
  200. f.close()
  201. size = os.path.getsize(TESTFN)
  202. if size != 5:
  203. self.fail("File size after ftruncate wrong %d" % size)
  204. finally:
  205. f.close()
  206. def testIteration(self):
  207. # Test the complex interaction when mixing file-iteration and the
  208. # various read* methods.
  209. dataoffset = 16384
  210. filler = b"ham\n"
  211. assert not dataoffset % len(filler), \
  212. "dataoffset must be multiple of len(filler)"
  213. nchunks = dataoffset // len(filler)
  214. testlines = [
  215. b"spam, spam and eggs\n",
  216. b"eggs, spam, ham and spam\n",
  217. b"saussages, spam, spam and eggs\n",
  218. b"spam, ham, spam and eggs\n",
  219. b"spam, spam, spam, spam, spam, ham, spam\n",
  220. b"wonderful spaaaaaam.\n"
  221. ]
  222. methods = [("readline", ()), ("read", ()), ("readlines", ()),
  223. ("readinto", (array("b", b" "*100),))]
  224. # Prepare the testfile
  225. bag = self.open(TESTFN, "wb")
  226. bag.write(filler * nchunks)
  227. bag.writelines(testlines)
  228. bag.close()
  229. # Test for appropriate errors mixing read* and iteration
  230. for methodname, args in methods:
  231. f = self.open(TESTFN, 'rb')
  232. self.assertEqual(next(f), filler)
  233. meth = getattr(f, methodname)
  234. meth(*args) # This simply shouldn't fail
  235. f.close()
  236. # Test to see if harmless (by accident) mixing of read* and
  237. # iteration still works. This depends on the size of the internal
  238. # iteration buffer (currently 8192,) but we can test it in a
  239. # flexible manner. Each line in the bag o' ham is 4 bytes
  240. # ("h", "a", "m", "\n"), so 4096 lines of that should get us
  241. # exactly on the buffer boundary for any power-of-2 buffersize
  242. # between 4 and 16384 (inclusive).
  243. f = self.open(TESTFN, 'rb')
  244. for i in range(nchunks):
  245. next(f)
  246. testline = testlines.pop(0)
  247. try:
  248. line = f.readline()
  249. except ValueError:
  250. self.fail("readline() after next() with supposedly empty "
  251. "iteration-buffer failed anyway")
  252. if line != testline:
  253. self.fail("readline() after next() with empty buffer "
  254. "failed. Got %r, expected %r" % (line, testline))
  255. testline = testlines.pop(0)
  256. buf = array("b", b"\x00" * len(testline))
  257. try:
  258. f.readinto(buf)
  259. except ValueError:
  260. self.fail("readinto() after next() with supposedly empty "
  261. "iteration-buffer failed anyway")
  262. line = buf.tobytes()
  263. if line != testline:
  264. self.fail("readinto() after next() with empty buffer "
  265. "failed. Got %r, expected %r" % (line, testline))
  266. testline = testlines.pop(0)
  267. try:
  268. line = f.read(len(testline))
  269. except ValueError:
  270. self.fail("read() after next() with supposedly empty "
  271. "iteration-buffer failed anyway")
  272. if line != testline:
  273. self.fail("read() after next() with empty buffer "
  274. "failed. Got %r, expected %r" % (line, testline))
  275. try:
  276. lines = f.readlines()
  277. except ValueError:
  278. self.fail("readlines() after next() with supposedly empty "
  279. "iteration-buffer failed anyway")
  280. if lines != testlines:
  281. self.fail("readlines() after next() with empty buffer "
  282. "failed. Got %r, expected %r" % (line, testline))
  283. f.close()
  284. # Reading after iteration hit EOF shouldn't hurt either
  285. f = self.open(TESTFN, 'rb')
  286. try:
  287. for line in f:
  288. pass
  289. try:
  290. f.readline()
  291. f.readinto(buf)
  292. f.read()
  293. f.readlines()
  294. except ValueError:
  295. self.fail("read* failed after next() consumed file")
  296. finally:
  297. f.close()
  298. class COtherFileTests(OtherFileTests, unittest.TestCase):
  299. open = io.open
  300. class PyOtherFileTests(OtherFileTests, unittest.TestCase):
  301. open = staticmethod(pyio.open)
  302. if __name__ == '__main__':
  303. unittest.main()