test_fileio.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. # Adapted from test_file.py by Daniel Stutzbach
  2. import sys
  3. import os
  4. import io
  5. import errno
  6. import unittest
  7. from array import array
  8. from weakref import proxy
  9. from functools import wraps
  10. from test.support import (
  11. cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi
  12. )
  13. from test.support.os_helper import (TESTFN, TESTFN_UNICODE, make_bad_fd)
  14. from test.support.warnings_helper import check_warnings
  15. from collections import UserList
  16. import _io # C implementation of io
  17. import _pyio # Python implementation of io
  18. class AutoFileTests:
  19. # file tests for which a test file is automatically set up
  20. def setUp(self):
  21. self.f = self.FileIO(TESTFN, 'w')
  22. def tearDown(self):
  23. if self.f:
  24. self.f.close()
  25. os.remove(TESTFN)
  26. def testWeakRefs(self):
  27. # verify weak references
  28. p = proxy(self.f)
  29. p.write(bytes(range(10)))
  30. self.assertEqual(self.f.tell(), p.tell())
  31. self.f.close()
  32. self.f = None
  33. gc_collect() # For PyPy or other GCs.
  34. self.assertRaises(ReferenceError, getattr, p, 'tell')
  35. def testSeekTell(self):
  36. self.f.write(bytes(range(20)))
  37. self.assertEqual(self.f.tell(), 20)
  38. self.f.seek(0)
  39. self.assertEqual(self.f.tell(), 0)
  40. self.f.seek(10)
  41. self.assertEqual(self.f.tell(), 10)
  42. self.f.seek(5, 1)
  43. self.assertEqual(self.f.tell(), 15)
  44. self.f.seek(-5, 1)
  45. self.assertEqual(self.f.tell(), 10)
  46. self.f.seek(-5, 2)
  47. self.assertEqual(self.f.tell(), 15)
  48. def testAttributes(self):
  49. # verify expected attributes exist
  50. f = self.f
  51. self.assertEqual(f.mode, "wb")
  52. self.assertEqual(f.closed, False)
  53. # verify the attributes are readonly
  54. for attr in 'mode', 'closed':
  55. self.assertRaises((AttributeError, TypeError),
  56. setattr, f, attr, 'oops')
  57. @unittest.skipIf(is_wasi, "WASI does not expose st_blksize.")
  58. def testBlksize(self):
  59. # test private _blksize attribute
  60. blksize = io.DEFAULT_BUFFER_SIZE
  61. # try to get preferred blksize from stat.st_blksize, if available
  62. if hasattr(os, 'fstat'):
  63. fst = os.fstat(self.f.fileno())
  64. blksize = getattr(fst, 'st_blksize', blksize)
  65. self.assertEqual(self.f._blksize, blksize)
  66. # verify readinto
  67. def testReadintoByteArray(self):
  68. self.f.write(bytes([1, 2, 0, 255]))
  69. self.f.close()
  70. ba = bytearray(b'abcdefgh')
  71. with self.FileIO(TESTFN, 'r') as f:
  72. n = f.readinto(ba)
  73. self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
  74. self.assertEqual(n, 4)
  75. def _testReadintoMemoryview(self):
  76. self.f.write(bytes([1, 2, 0, 255]))
  77. self.f.close()
  78. m = memoryview(bytearray(b'abcdefgh'))
  79. with self.FileIO(TESTFN, 'r') as f:
  80. n = f.readinto(m)
  81. self.assertEqual(m, b'\x01\x02\x00\xffefgh')
  82. self.assertEqual(n, 4)
  83. m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
  84. with self.FileIO(TESTFN, 'r') as f:
  85. n = f.readinto(m)
  86. self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
  87. self.assertEqual(n, 4)
  88. def _testReadintoArray(self):
  89. self.f.write(bytes([1, 2, 0, 255]))
  90. self.f.close()
  91. a = array('B', b'abcdefgh')
  92. with self.FileIO(TESTFN, 'r') as f:
  93. n = f.readinto(a)
  94. self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
  95. self.assertEqual(n, 4)
  96. a = array('b', b'abcdefgh')
  97. with self.FileIO(TESTFN, 'r') as f:
  98. n = f.readinto(a)
  99. self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
  100. self.assertEqual(n, 4)
  101. a = array('I', b'abcdefgh')
  102. with self.FileIO(TESTFN, 'r') as f:
  103. n = f.readinto(a)
  104. self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
  105. self.assertEqual(n, 4)
  106. def testWritelinesList(self):
  107. l = [b'123', b'456']
  108. self.f.writelines(l)
  109. self.f.close()
  110. self.f = self.FileIO(TESTFN, 'rb')
  111. buf = self.f.read()
  112. self.assertEqual(buf, b'123456')
  113. def testWritelinesUserList(self):
  114. l = UserList([b'123', b'456'])
  115. self.f.writelines(l)
  116. self.f.close()
  117. self.f = self.FileIO(TESTFN, 'rb')
  118. buf = self.f.read()
  119. self.assertEqual(buf, b'123456')
  120. def testWritelinesError(self):
  121. self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
  122. self.assertRaises(TypeError, self.f.writelines, None)
  123. self.assertRaises(TypeError, self.f.writelines, "abc")
  124. def test_none_args(self):
  125. self.f.write(b"hi\nbye\nabc")
  126. self.f.close()
  127. self.f = self.FileIO(TESTFN, 'r')
  128. self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
  129. self.f.seek(0)
  130. self.assertEqual(self.f.readline(None), b"hi\n")
  131. self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
  132. def test_reject(self):
  133. self.assertRaises(TypeError, self.f.write, "Hello!")
  134. def testRepr(self):
  135. self.assertEqual(repr(self.f),
  136. "<%s.FileIO name=%r mode=%r closefd=True>" %
  137. (self.modulename, self.f.name, self.f.mode))
  138. del self.f.name
  139. self.assertEqual(repr(self.f),
  140. "<%s.FileIO fd=%r mode=%r closefd=True>" %
  141. (self.modulename, self.f.fileno(), self.f.mode))
  142. self.f.close()
  143. self.assertEqual(repr(self.f),
  144. "<%s.FileIO [closed]>" % (self.modulename,))
  145. def testReprNoCloseFD(self):
  146. fd = os.open(TESTFN, os.O_RDONLY)
  147. try:
  148. with self.FileIO(fd, 'r', closefd=False) as f:
  149. self.assertEqual(repr(f),
  150. "<%s.FileIO name=%r mode=%r closefd=False>" %
  151. (self.modulename, f.name, f.mode))
  152. finally:
  153. os.close(fd)
  154. def testRecursiveRepr(self):
  155. # Issue #25455
  156. with swap_attr(self.f, 'name', self.f):
  157. with self.assertRaises(RuntimeError):
  158. repr(self.f) # Should not crash
  159. def testErrors(self):
  160. f = self.f
  161. self.assertFalse(f.isatty())
  162. self.assertFalse(f.closed)
  163. #self.assertEqual(f.name, TESTFN)
  164. self.assertRaises(ValueError, f.read, 10) # Open for reading
  165. f.close()
  166. self.assertTrue(f.closed)
  167. f = self.FileIO(TESTFN, 'r')
  168. self.assertRaises(TypeError, f.readinto, "")
  169. self.assertFalse(f.closed)
  170. f.close()
  171. self.assertTrue(f.closed)
  172. def testMethods(self):
  173. methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
  174. 'read', 'readall', 'readline', 'readlines',
  175. 'tell', 'truncate', 'flush']
  176. self.f.close()
  177. self.assertTrue(self.f.closed)
  178. for methodname in methods:
  179. method = getattr(self.f, methodname)
  180. # should raise on closed file
  181. self.assertRaises(ValueError, method)
  182. self.assertRaises(TypeError, self.f.readinto)
  183. self.assertRaises(ValueError, self.f.readinto, bytearray(1))
  184. self.assertRaises(TypeError, self.f.seek)
  185. self.assertRaises(ValueError, self.f.seek, 0)
  186. self.assertRaises(TypeError, self.f.write)
  187. self.assertRaises(ValueError, self.f.write, b'')
  188. self.assertRaises(TypeError, self.f.writelines)
  189. self.assertRaises(ValueError, self.f.writelines, b'')
  190. def testOpendir(self):
  191. # Issue 3703: opening a directory should fill the errno
  192. # Windows always returns "[Errno 13]: Permission denied
  193. # Unix uses fstat and returns "[Errno 21]: Is a directory"
  194. try:
  195. self.FileIO('.', 'r')
  196. except OSError as e:
  197. self.assertNotEqual(e.errno, 0)
  198. self.assertEqual(e.filename, ".")
  199. else:
  200. self.fail("Should have raised OSError")
  201. @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
  202. def testOpenDirFD(self):
  203. fd = os.open('.', os.O_RDONLY)
  204. with self.assertRaises(OSError) as cm:
  205. self.FileIO(fd, 'r')
  206. os.close(fd)
  207. self.assertEqual(cm.exception.errno, errno.EISDIR)
  208. #A set of functions testing that we get expected behaviour if someone has
  209. #manually closed the internal file descriptor. First, a decorator:
  210. def ClosedFD(func):
  211. @wraps(func)
  212. def wrapper(self):
  213. #forcibly close the fd before invoking the problem function
  214. f = self.f
  215. os.close(f.fileno())
  216. try:
  217. func(self, f)
  218. finally:
  219. try:
  220. self.f.close()
  221. except OSError:
  222. pass
  223. return wrapper
  224. def ClosedFDRaises(func):
  225. @wraps(func)
  226. def wrapper(self):
  227. #forcibly close the fd before invoking the problem function
  228. f = self.f
  229. os.close(f.fileno())
  230. try:
  231. func(self, f)
  232. except OSError as e:
  233. self.assertEqual(e.errno, errno.EBADF)
  234. else:
  235. self.fail("Should have raised OSError")
  236. finally:
  237. try:
  238. self.f.close()
  239. except OSError:
  240. pass
  241. return wrapper
  242. @ClosedFDRaises
  243. def testErrnoOnClose(self, f):
  244. f.close()
  245. @ClosedFDRaises
  246. def testErrnoOnClosedWrite(self, f):
  247. f.write(b'a')
  248. @ClosedFDRaises
  249. def testErrnoOnClosedSeek(self, f):
  250. f.seek(0)
  251. @ClosedFDRaises
  252. def testErrnoOnClosedTell(self, f):
  253. f.tell()
  254. @ClosedFDRaises
  255. def testErrnoOnClosedTruncate(self, f):
  256. f.truncate(0)
  257. @ClosedFD
  258. def testErrnoOnClosedSeekable(self, f):
  259. f.seekable()
  260. @ClosedFD
  261. def testErrnoOnClosedReadable(self, f):
  262. f.readable()
  263. @ClosedFD
  264. def testErrnoOnClosedWritable(self, f):
  265. f.writable()
  266. @ClosedFD
  267. def testErrnoOnClosedFileno(self, f):
  268. f.fileno()
  269. @ClosedFD
  270. def testErrnoOnClosedIsatty(self, f):
  271. self.assertEqual(f.isatty(), False)
  272. def ReopenForRead(self):
  273. try:
  274. self.f.close()
  275. except OSError:
  276. pass
  277. self.f = self.FileIO(TESTFN, 'r')
  278. os.close(self.f.fileno())
  279. return self.f
  280. @ClosedFDRaises
  281. def testErrnoOnClosedRead(self, f):
  282. f = self.ReopenForRead()
  283. f.read(1)
  284. @ClosedFDRaises
  285. def testErrnoOnClosedReadall(self, f):
  286. f = self.ReopenForRead()
  287. f.readall()
  288. @ClosedFDRaises
  289. def testErrnoOnClosedReadinto(self, f):
  290. f = self.ReopenForRead()
  291. a = array('b', b'x'*10)
  292. f.readinto(a)
  293. class CAutoFileTests(AutoFileTests, unittest.TestCase):
  294. FileIO = _io.FileIO
  295. modulename = '_io'
  296. class PyAutoFileTests(AutoFileTests, unittest.TestCase):
  297. FileIO = _pyio.FileIO
  298. modulename = '_pyio'
  299. class OtherFileTests:
  300. def testAbles(self):
  301. try:
  302. f = self.FileIO(TESTFN, "w")
  303. self.assertEqual(f.readable(), False)
  304. self.assertEqual(f.writable(), True)
  305. self.assertEqual(f.seekable(), True)
  306. f.close()
  307. f = self.FileIO(TESTFN, "r")
  308. self.assertEqual(f.readable(), True)
  309. self.assertEqual(f.writable(), False)
  310. self.assertEqual(f.seekable(), True)
  311. f.close()
  312. f = self.FileIO(TESTFN, "a+")
  313. self.assertEqual(f.readable(), True)
  314. self.assertEqual(f.writable(), True)
  315. self.assertEqual(f.seekable(), True)
  316. self.assertEqual(f.isatty(), False)
  317. f.close()
  318. if sys.platform != "win32" and not is_emscripten:
  319. try:
  320. f = self.FileIO("/dev/tty", "a")
  321. except OSError:
  322. # When run in a cron job there just aren't any
  323. # ttys, so skip the test. This also handles other
  324. # OS'es that don't support /dev/tty.
  325. pass
  326. else:
  327. self.assertEqual(f.readable(), False)
  328. self.assertEqual(f.writable(), True)
  329. if sys.platform != "darwin" and \
  330. 'bsd' not in sys.platform and \
  331. not sys.platform.startswith(('sunos', 'aix')):
  332. # Somehow /dev/tty appears seekable on some BSDs
  333. self.assertEqual(f.seekable(), False)
  334. self.assertEqual(f.isatty(), True)
  335. f.close()
  336. finally:
  337. os.unlink(TESTFN)
  338. def testInvalidModeStrings(self):
  339. # check invalid mode strings
  340. for mode in ("", "aU", "wU+", "rw", "rt"):
  341. try:
  342. f = self.FileIO(TESTFN, mode)
  343. except ValueError:
  344. pass
  345. else:
  346. f.close()
  347. self.fail('%r is an invalid file mode' % mode)
  348. def testModeStrings(self):
  349. # test that the mode attribute is correct for various mode strings
  350. # given as init args
  351. try:
  352. for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
  353. ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
  354. ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
  355. ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
  356. # read modes are last so that TESTFN will exist first
  357. with self.FileIO(TESTFN, modes[0]) as f:
  358. self.assertEqual(f.mode, modes[1])
  359. finally:
  360. if os.path.exists(TESTFN):
  361. os.unlink(TESTFN)
  362. def testUnicodeOpen(self):
  363. # verify repr works for unicode too
  364. f = self.FileIO(str(TESTFN), "w")
  365. f.close()
  366. os.unlink(TESTFN)
  367. def testBytesOpen(self):
  368. # Opening a bytes filename
  369. try:
  370. fn = TESTFN.encode("ascii")
  371. except UnicodeEncodeError:
  372. self.skipTest('could not encode %r to ascii' % TESTFN)
  373. f = self.FileIO(fn, "w")
  374. try:
  375. f.write(b"abc")
  376. f.close()
  377. with open(TESTFN, "rb") as f:
  378. self.assertEqual(f.read(), b"abc")
  379. finally:
  380. os.unlink(TESTFN)
  381. @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
  382. "test only works for utf-8 filesystems")
  383. def testUtf8BytesOpen(self):
  384. # Opening a UTF-8 bytes filename
  385. try:
  386. fn = TESTFN_UNICODE.encode("utf-8")
  387. except UnicodeEncodeError:
  388. self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
  389. f = self.FileIO(fn, "w")
  390. try:
  391. f.write(b"abc")
  392. f.close()
  393. with open(TESTFN_UNICODE, "rb") as f:
  394. self.assertEqual(f.read(), b"abc")
  395. finally:
  396. os.unlink(TESTFN_UNICODE)
  397. def testConstructorHandlesNULChars(self):
  398. fn_with_NUL = 'foo\0bar'
  399. self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
  400. self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
  401. def testInvalidFd(self):
  402. self.assertRaises(ValueError, self.FileIO, -10)
  403. self.assertRaises(OSError, self.FileIO, make_bad_fd())
  404. if sys.platform == 'win32':
  405. import msvcrt
  406. self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
  407. def testBadModeArgument(self):
  408. # verify that we get a sensible error message for bad mode argument
  409. bad_mode = "qwerty"
  410. try:
  411. f = self.FileIO(TESTFN, bad_mode)
  412. except ValueError as msg:
  413. if msg.args[0] != 0:
  414. s = str(msg)
  415. if TESTFN in s or bad_mode not in s:
  416. self.fail("bad error message for invalid mode: %s" % s)
  417. # if msg.args[0] == 0, we're probably on Windows where there may be
  418. # no obvious way to discover why open() failed.
  419. else:
  420. f.close()
  421. self.fail("no error for invalid mode: %s" % bad_mode)
  422. def testTruncate(self):
  423. f = self.FileIO(TESTFN, 'w')
  424. f.write(bytes(bytearray(range(10))))
  425. self.assertEqual(f.tell(), 10)
  426. f.truncate(5)
  427. self.assertEqual(f.tell(), 10)
  428. self.assertEqual(f.seek(0, io.SEEK_END), 5)
  429. f.truncate(15)
  430. self.assertEqual(f.tell(), 5)
  431. self.assertEqual(f.seek(0, io.SEEK_END), 15)
  432. f.close()
  433. def testTruncateOnWindows(self):
  434. def bug801631():
  435. # SF bug <http://www.python.org/sf/801631>
  436. # "file.truncate fault on windows"
  437. f = self.FileIO(TESTFN, 'w')
  438. f.write(bytes(range(11)))
  439. f.close()
  440. f = self.FileIO(TESTFN,'r+')
  441. data = f.read(5)
  442. if data != bytes(range(5)):
  443. self.fail("Read on file opened for update failed %r" % data)
  444. if f.tell() != 5:
  445. self.fail("File pos after read wrong %d" % f.tell())
  446. f.truncate()
  447. if f.tell() != 5:
  448. self.fail("File pos after ftruncate wrong %d" % f.tell())
  449. f.close()
  450. size = os.path.getsize(TESTFN)
  451. if size != 5:
  452. self.fail("File size after ftruncate wrong %d" % size)
  453. try:
  454. bug801631()
  455. finally:
  456. os.unlink(TESTFN)
  457. def testAppend(self):
  458. try:
  459. f = open(TESTFN, 'wb')
  460. f.write(b'spam')
  461. f.close()
  462. f = open(TESTFN, 'ab')
  463. f.write(b'eggs')
  464. f.close()
  465. f = open(TESTFN, 'rb')
  466. d = f.read()
  467. f.close()
  468. self.assertEqual(d, b'spameggs')
  469. finally:
  470. try:
  471. os.unlink(TESTFN)
  472. except:
  473. pass
  474. def testInvalidInit(self):
  475. self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
  476. def testWarnings(self):
  477. with check_warnings(quiet=True) as w:
  478. self.assertEqual(w.warnings, [])
  479. self.assertRaises(TypeError, self.FileIO, [])
  480. self.assertEqual(w.warnings, [])
  481. self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
  482. self.assertEqual(w.warnings, [])
  483. def testUnclosedFDOnException(self):
  484. class MyException(Exception): pass
  485. class MyFileIO(self.FileIO):
  486. def __setattr__(self, name, value):
  487. if name == "name":
  488. raise MyException("blocked setting name")
  489. return super(MyFileIO, self).__setattr__(name, value)
  490. fd = os.open(__file__, os.O_RDONLY)
  491. self.assertRaises(MyException, MyFileIO, fd)
  492. os.close(fd) # should not raise OSError(EBADF)
  493. class COtherFileTests(OtherFileTests, unittest.TestCase):
  494. FileIO = _io.FileIO
  495. modulename = '_io'
  496. @cpython_only
  497. def testInvalidFd_overflow(self):
  498. # Issue 15989
  499. import _testcapi
  500. self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
  501. self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
  502. def test_open_code(self):
  503. # Check that the default behaviour of open_code matches
  504. # open("rb")
  505. with self.FileIO(__file__, "rb") as f:
  506. expected = f.read()
  507. with _io.open_code(__file__) as f:
  508. actual = f.read()
  509. self.assertEqual(expected, actual)
  510. class PyOtherFileTests(OtherFileTests, unittest.TestCase):
  511. FileIO = _pyio.FileIO
  512. modulename = '_pyio'
  513. def test_open_code(self):
  514. # Check that the default behaviour of open_code matches
  515. # open("rb")
  516. with self.FileIO(__file__, "rb") as f:
  517. expected = f.read()
  518. with check_warnings(quiet=True) as w:
  519. # Always test _open_code_with_warning
  520. with _pyio._open_code_with_warning(__file__) as f:
  521. actual = f.read()
  522. self.assertEqual(expected, actual)
  523. self.assertNotEqual(w.warnings, [])
  524. def tearDownModule():
  525. # Historically, these tests have been sloppy about removing TESTFN.
  526. # So get rid of it no matter what.
  527. if os.path.exists(TESTFN):
  528. os.unlink(TESTFN)
  529. if __name__ == '__main__':
  530. unittest.main()