test_tempfile.py 63 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744
  1. # tempfile.py unit tests.
  2. import tempfile
  3. import errno
  4. import io
  5. import os
  6. import pathlib
  7. import sys
  8. import re
  9. import warnings
  10. import contextlib
  11. import stat
  12. import types
  13. import weakref
  14. from unittest import mock
  15. import unittest
  16. from test import support
  17. from test.support import os_helper
  18. from test.support import script_helper
  19. from test.support import warnings_helper
  20. has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
  21. has_spawnl = hasattr(os, 'spawnl')
  22. # TEST_FILES may need to be tweaked for systems depending on the maximum
  23. # number of files that can be opened at one time (see ulimit -n)
  24. if sys.platform.startswith('openbsd'):
  25. TEST_FILES = 48
  26. else:
  27. TEST_FILES = 100
  28. # This is organized as one test for each chunk of code in tempfile.py,
  29. # in order of their appearance in the file. Testing which requires
  30. # threads is not done here.
  31. class TestLowLevelInternals(unittest.TestCase):
  32. def test_infer_return_type_singles(self):
  33. self.assertIs(str, tempfile._infer_return_type(''))
  34. self.assertIs(bytes, tempfile._infer_return_type(b''))
  35. self.assertIs(str, tempfile._infer_return_type(None))
  36. def test_infer_return_type_multiples(self):
  37. self.assertIs(str, tempfile._infer_return_type('', ''))
  38. self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
  39. with self.assertRaises(TypeError):
  40. tempfile._infer_return_type('', b'')
  41. with self.assertRaises(TypeError):
  42. tempfile._infer_return_type(b'', '')
  43. def test_infer_return_type_multiples_and_none(self):
  44. self.assertIs(str, tempfile._infer_return_type(None, ''))
  45. self.assertIs(str, tempfile._infer_return_type('', None))
  46. self.assertIs(str, tempfile._infer_return_type(None, None))
  47. self.assertIs(bytes, tempfile._infer_return_type(b'', None))
  48. self.assertIs(bytes, tempfile._infer_return_type(None, b''))
  49. with self.assertRaises(TypeError):
  50. tempfile._infer_return_type('', None, b'')
  51. with self.assertRaises(TypeError):
  52. tempfile._infer_return_type(b'', None, '')
  53. def test_infer_return_type_pathlib(self):
  54. self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
  55. def test_infer_return_type_pathlike(self):
  56. class Path:
  57. def __init__(self, path):
  58. self.path = path
  59. def __fspath__(self):
  60. return self.path
  61. self.assertIs(str, tempfile._infer_return_type(Path('/')))
  62. self.assertIs(bytes, tempfile._infer_return_type(Path(b'/')))
  63. self.assertIs(str, tempfile._infer_return_type('', Path('')))
  64. self.assertIs(bytes, tempfile._infer_return_type(b'', Path(b'')))
  65. self.assertIs(bytes, tempfile._infer_return_type(None, Path(b'')))
  66. self.assertIs(str, tempfile._infer_return_type(None, Path('')))
  67. with self.assertRaises(TypeError):
  68. tempfile._infer_return_type('', Path(b''))
  69. with self.assertRaises(TypeError):
  70. tempfile._infer_return_type(b'', Path(''))
  71. # Common functionality.
  72. class BaseTestCase(unittest.TestCase):
  73. str_check = re.compile(r"^[a-z0-9_-]{8}$")
  74. b_check = re.compile(br"^[a-z0-9_-]{8}$")
  75. def setUp(self):
  76. self.enterContext(warnings_helper.check_warnings())
  77. warnings.filterwarnings("ignore", category=RuntimeWarning,
  78. message="mktemp", module=__name__)
  79. def nameCheck(self, name, dir, pre, suf):
  80. (ndir, nbase) = os.path.split(name)
  81. npre = nbase[:len(pre)]
  82. nsuf = nbase[len(nbase)-len(suf):]
  83. if dir is not None:
  84. self.assertIs(
  85. type(name),
  86. str
  87. if type(dir) is str or isinstance(dir, os.PathLike) else
  88. bytes,
  89. "unexpected return type",
  90. )
  91. if pre is not None:
  92. self.assertIs(type(name), str if type(pre) is str else bytes,
  93. "unexpected return type")
  94. if suf is not None:
  95. self.assertIs(type(name), str if type(suf) is str else bytes,
  96. "unexpected return type")
  97. if (dir, pre, suf) == (None, None, None):
  98. self.assertIs(type(name), str, "default return type must be str")
  99. # check for equality of the absolute paths!
  100. self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
  101. "file %r not in directory %r" % (name, dir))
  102. self.assertEqual(npre, pre,
  103. "file %r does not begin with %r" % (nbase, pre))
  104. self.assertEqual(nsuf, suf,
  105. "file %r does not end with %r" % (nbase, suf))
  106. nbase = nbase[len(pre):len(nbase)-len(suf)]
  107. check = self.str_check if isinstance(nbase, str) else self.b_check
  108. self.assertTrue(check.match(nbase),
  109. "random characters %r do not match %r"
  110. % (nbase, check.pattern))
  111. class TestExports(BaseTestCase):
  112. def test_exports(self):
  113. # There are no surprising symbols in the tempfile module
  114. dict = tempfile.__dict__
  115. expected = {
  116. "NamedTemporaryFile" : 1,
  117. "TemporaryFile" : 1,
  118. "mkstemp" : 1,
  119. "mkdtemp" : 1,
  120. "mktemp" : 1,
  121. "TMP_MAX" : 1,
  122. "gettempprefix" : 1,
  123. "gettempprefixb" : 1,
  124. "gettempdir" : 1,
  125. "gettempdirb" : 1,
  126. "tempdir" : 1,
  127. "template" : 1,
  128. "SpooledTemporaryFile" : 1,
  129. "TemporaryDirectory" : 1,
  130. }
  131. unexp = []
  132. for key in dict:
  133. if key[0] != '_' and key not in expected:
  134. unexp.append(key)
  135. self.assertTrue(len(unexp) == 0,
  136. "unexpected keys: %s" % unexp)
  137. class TestRandomNameSequence(BaseTestCase):
  138. """Test the internal iterator object _RandomNameSequence."""
  139. def setUp(self):
  140. self.r = tempfile._RandomNameSequence()
  141. super().setUp()
  142. def test_get_eight_char_str(self):
  143. # _RandomNameSequence returns a eight-character string
  144. s = next(self.r)
  145. self.nameCheck(s, '', '', '')
  146. def test_many(self):
  147. # _RandomNameSequence returns no duplicate strings (stochastic)
  148. dict = {}
  149. r = self.r
  150. for i in range(TEST_FILES):
  151. s = next(r)
  152. self.nameCheck(s, '', '', '')
  153. self.assertNotIn(s, dict)
  154. dict[s] = 1
  155. def supports_iter(self):
  156. # _RandomNameSequence supports the iterator protocol
  157. i = 0
  158. r = self.r
  159. for s in r:
  160. i += 1
  161. if i == 20:
  162. break
  163. @support.requires_fork()
  164. def test_process_awareness(self):
  165. # ensure that the random source differs between
  166. # child and parent.
  167. read_fd, write_fd = os.pipe()
  168. pid = None
  169. try:
  170. pid = os.fork()
  171. if not pid:
  172. # child process
  173. os.close(read_fd)
  174. os.write(write_fd, next(self.r).encode("ascii"))
  175. os.close(write_fd)
  176. # bypass the normal exit handlers- leave those to
  177. # the parent.
  178. os._exit(0)
  179. # parent process
  180. parent_value = next(self.r)
  181. child_value = os.read(read_fd, len(parent_value)).decode("ascii")
  182. finally:
  183. if pid:
  184. support.wait_process(pid, exitcode=0)
  185. os.close(read_fd)
  186. os.close(write_fd)
  187. self.assertNotEqual(child_value, parent_value)
  188. class TestCandidateTempdirList(BaseTestCase):
  189. """Test the internal function _candidate_tempdir_list."""
  190. def test_nonempty_list(self):
  191. # _candidate_tempdir_list returns a nonempty list of strings
  192. cand = tempfile._candidate_tempdir_list()
  193. self.assertFalse(len(cand) == 0)
  194. for c in cand:
  195. self.assertIsInstance(c, str)
  196. def test_wanted_dirs(self):
  197. # _candidate_tempdir_list contains the expected directories
  198. # Make sure the interesting environment variables are all set.
  199. with os_helper.EnvironmentVarGuard() as env:
  200. for envname in 'TMPDIR', 'TEMP', 'TMP':
  201. dirname = os.getenv(envname)
  202. if not dirname:
  203. env[envname] = os.path.abspath(envname)
  204. cand = tempfile._candidate_tempdir_list()
  205. for envname in 'TMPDIR', 'TEMP', 'TMP':
  206. dirname = os.getenv(envname)
  207. if not dirname: raise ValueError
  208. self.assertIn(dirname, cand)
  209. try:
  210. dirname = os.getcwd()
  211. except (AttributeError, OSError):
  212. dirname = os.curdir
  213. self.assertIn(dirname, cand)
  214. # Not practical to try to verify the presence of OS-specific
  215. # paths in this list.
  216. # We test _get_default_tempdir some more by testing gettempdir.
  217. class TestGetDefaultTempdir(BaseTestCase):
  218. """Test _get_default_tempdir()."""
  219. def test_no_files_left_behind(self):
  220. # use a private empty directory
  221. with tempfile.TemporaryDirectory() as our_temp_directory:
  222. # force _get_default_tempdir() to consider our empty directory
  223. def our_candidate_list():
  224. return [our_temp_directory]
  225. with support.swap_attr(tempfile, "_candidate_tempdir_list",
  226. our_candidate_list):
  227. # verify our directory is empty after _get_default_tempdir()
  228. tempfile._get_default_tempdir()
  229. self.assertEqual(os.listdir(our_temp_directory), [])
  230. def raise_OSError(*args, **kwargs):
  231. raise OSError()
  232. with support.swap_attr(os, "open", raise_OSError):
  233. # test again with failing os.open()
  234. with self.assertRaises(FileNotFoundError):
  235. tempfile._get_default_tempdir()
  236. self.assertEqual(os.listdir(our_temp_directory), [])
  237. with support.swap_attr(os, "write", raise_OSError):
  238. # test again with failing os.write()
  239. with self.assertRaises(FileNotFoundError):
  240. tempfile._get_default_tempdir()
  241. self.assertEqual(os.listdir(our_temp_directory), [])
  242. class TestGetCandidateNames(BaseTestCase):
  243. """Test the internal function _get_candidate_names."""
  244. def test_retval(self):
  245. # _get_candidate_names returns a _RandomNameSequence object
  246. obj = tempfile._get_candidate_names()
  247. self.assertIsInstance(obj, tempfile._RandomNameSequence)
  248. def test_same_thing(self):
  249. # _get_candidate_names always returns the same object
  250. a = tempfile._get_candidate_names()
  251. b = tempfile._get_candidate_names()
  252. self.assertTrue(a is b)
  253. @contextlib.contextmanager
  254. def _inside_empty_temp_dir():
  255. dir = tempfile.mkdtemp()
  256. try:
  257. with support.swap_attr(tempfile, 'tempdir', dir):
  258. yield
  259. finally:
  260. os_helper.rmtree(dir)
  261. def _mock_candidate_names(*names):
  262. return support.swap_attr(tempfile,
  263. '_get_candidate_names',
  264. lambda: iter(names))
  265. class TestBadTempdir:
  266. @unittest.skipIf(
  267. support.is_emscripten, "Emscripten cannot remove write bits."
  268. )
  269. def test_read_only_directory(self):
  270. with _inside_empty_temp_dir():
  271. oldmode = mode = os.stat(tempfile.tempdir).st_mode
  272. mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
  273. os.chmod(tempfile.tempdir, mode)
  274. try:
  275. if os.access(tempfile.tempdir, os.W_OK):
  276. self.skipTest("can't set the directory read-only")
  277. with self.assertRaises(PermissionError):
  278. self.make_temp()
  279. self.assertEqual(os.listdir(tempfile.tempdir), [])
  280. finally:
  281. os.chmod(tempfile.tempdir, oldmode)
  282. def test_nonexisting_directory(self):
  283. with _inside_empty_temp_dir():
  284. tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
  285. with support.swap_attr(tempfile, 'tempdir', tempdir):
  286. with self.assertRaises(FileNotFoundError):
  287. self.make_temp()
  288. def test_non_directory(self):
  289. with _inside_empty_temp_dir():
  290. tempdir = os.path.join(tempfile.tempdir, 'file')
  291. open(tempdir, 'wb').close()
  292. with support.swap_attr(tempfile, 'tempdir', tempdir):
  293. with self.assertRaises((NotADirectoryError, FileNotFoundError)):
  294. self.make_temp()
  295. class TestMkstempInner(TestBadTempdir, BaseTestCase):
  296. """Test the internal function _mkstemp_inner."""
  297. class mkstemped:
  298. _bflags = tempfile._bin_openflags
  299. _tflags = tempfile._text_openflags
  300. _close = os.close
  301. _unlink = os.unlink
  302. def __init__(self, dir, pre, suf, bin):
  303. if bin: flags = self._bflags
  304. else: flags = self._tflags
  305. output_type = tempfile._infer_return_type(dir, pre, suf)
  306. (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
  307. def write(self, str):
  308. os.write(self.fd, str)
  309. def __del__(self):
  310. self._close(self.fd)
  311. self._unlink(self.name)
  312. def do_create(self, dir=None, pre=None, suf=None, bin=1):
  313. output_type = tempfile._infer_return_type(dir, pre, suf)
  314. if dir is None:
  315. if output_type is str:
  316. dir = tempfile.gettempdir()
  317. else:
  318. dir = tempfile.gettempdirb()
  319. if pre is None:
  320. pre = output_type()
  321. if suf is None:
  322. suf = output_type()
  323. file = self.mkstemped(dir, pre, suf, bin)
  324. self.nameCheck(file.name, dir, pre, suf)
  325. return file
  326. def test_basic(self):
  327. # _mkstemp_inner can create files
  328. self.do_create().write(b"blat")
  329. self.do_create(pre="a").write(b"blat")
  330. self.do_create(suf="b").write(b"blat")
  331. self.do_create(pre="a", suf="b").write(b"blat")
  332. self.do_create(pre="aa", suf=".txt").write(b"blat")
  333. def test_basic_with_bytes_names(self):
  334. # _mkstemp_inner can create files when given name parts all
  335. # specified as bytes.
  336. dir_b = tempfile.gettempdirb()
  337. self.do_create(dir=dir_b, suf=b"").write(b"blat")
  338. self.do_create(dir=dir_b, pre=b"a").write(b"blat")
  339. self.do_create(dir=dir_b, suf=b"b").write(b"blat")
  340. self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
  341. self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
  342. # Can't mix str & binary types in the args.
  343. with self.assertRaises(TypeError):
  344. self.do_create(dir="", suf=b"").write(b"blat")
  345. with self.assertRaises(TypeError):
  346. self.do_create(dir=dir_b, pre="").write(b"blat")
  347. with self.assertRaises(TypeError):
  348. self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
  349. def test_basic_many(self):
  350. # _mkstemp_inner can create many files (stochastic)
  351. extant = list(range(TEST_FILES))
  352. for i in extant:
  353. extant[i] = self.do_create(pre="aa")
  354. def test_choose_directory(self):
  355. # _mkstemp_inner can create files in a user-selected directory
  356. dir = tempfile.mkdtemp()
  357. try:
  358. self.do_create(dir=dir).write(b"blat")
  359. self.do_create(dir=pathlib.Path(dir)).write(b"blat")
  360. finally:
  361. support.gc_collect() # For PyPy or other GCs.
  362. os.rmdir(dir)
  363. @os_helper.skip_unless_working_chmod
  364. def test_file_mode(self):
  365. # _mkstemp_inner creates files with the proper mode
  366. file = self.do_create()
  367. mode = stat.S_IMODE(os.stat(file.name).st_mode)
  368. expected = 0o600
  369. if sys.platform == 'win32':
  370. # There's no distinction among 'user', 'group' and 'world';
  371. # replicate the 'user' bits.
  372. user = expected >> 6
  373. expected = user * (1 + 8 + 64)
  374. self.assertEqual(mode, expected)
  375. @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
  376. @support.requires_subprocess()
  377. def test_noinherit(self):
  378. # _mkstemp_inner file handles are not inherited by child processes
  379. if support.verbose:
  380. v="v"
  381. else:
  382. v="q"
  383. file = self.do_create()
  384. self.assertEqual(os.get_inheritable(file.fd), False)
  385. fd = "%d" % file.fd
  386. try:
  387. me = __file__
  388. except NameError:
  389. me = sys.argv[0]
  390. # We have to exec something, so that FD_CLOEXEC will take
  391. # effect. The core of this test is therefore in
  392. # tf_inherit_check.py, which see.
  393. tester = os.path.join(os.path.dirname(os.path.abspath(me)),
  394. "tf_inherit_check.py")
  395. # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
  396. # but an arg with embedded spaces should be decorated with double
  397. # quotes on each end
  398. if sys.platform == 'win32':
  399. decorated = '"%s"' % sys.executable
  400. tester = '"%s"' % tester
  401. else:
  402. decorated = sys.executable
  403. retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
  404. self.assertFalse(retval < 0,
  405. "child process caught fatal signal %d" % -retval)
  406. self.assertFalse(retval > 0, "child process reports failure %d"%retval)
  407. @unittest.skipUnless(has_textmode, "text mode not available")
  408. def test_textmode(self):
  409. # _mkstemp_inner can create files in text mode
  410. # A text file is truncated at the first Ctrl+Z byte
  411. f = self.do_create(bin=0)
  412. f.write(b"blat\x1a")
  413. f.write(b"extra\n")
  414. os.lseek(f.fd, 0, os.SEEK_SET)
  415. self.assertEqual(os.read(f.fd, 20), b"blat")
  416. def make_temp(self):
  417. return tempfile._mkstemp_inner(tempfile.gettempdir(),
  418. tempfile.gettempprefix(),
  419. '',
  420. tempfile._bin_openflags,
  421. str)
  422. def test_collision_with_existing_file(self):
  423. # _mkstemp_inner tries another name when a file with
  424. # the chosen name already exists
  425. with _inside_empty_temp_dir(), \
  426. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  427. (fd1, name1) = self.make_temp()
  428. os.close(fd1)
  429. self.assertTrue(name1.endswith('aaa'))
  430. (fd2, name2) = self.make_temp()
  431. os.close(fd2)
  432. self.assertTrue(name2.endswith('bbb'))
  433. def test_collision_with_existing_directory(self):
  434. # _mkstemp_inner tries another name when a directory with
  435. # the chosen name already exists
  436. with _inside_empty_temp_dir(), \
  437. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  438. dir = tempfile.mkdtemp()
  439. self.assertTrue(dir.endswith('aaa'))
  440. (fd, name) = self.make_temp()
  441. os.close(fd)
  442. self.assertTrue(name.endswith('bbb'))
  443. class TestGetTempPrefix(BaseTestCase):
  444. """Test gettempprefix()."""
  445. def test_sane_template(self):
  446. # gettempprefix returns a nonempty prefix string
  447. p = tempfile.gettempprefix()
  448. self.assertIsInstance(p, str)
  449. self.assertGreater(len(p), 0)
  450. pb = tempfile.gettempprefixb()
  451. self.assertIsInstance(pb, bytes)
  452. self.assertGreater(len(pb), 0)
  453. def test_usable_template(self):
  454. # gettempprefix returns a usable prefix string
  455. # Create a temp directory, avoiding use of the prefix.
  456. # Then attempt to create a file whose name is
  457. # prefix + 'xxxxxx.xxx' in that directory.
  458. p = tempfile.gettempprefix() + "xxxxxx.xxx"
  459. d = tempfile.mkdtemp(prefix="")
  460. try:
  461. p = os.path.join(d, p)
  462. fd = os.open(p, os.O_RDWR | os.O_CREAT)
  463. os.close(fd)
  464. os.unlink(p)
  465. finally:
  466. os.rmdir(d)
  467. class TestGetTempDir(BaseTestCase):
  468. """Test gettempdir()."""
  469. def test_directory_exists(self):
  470. # gettempdir returns a directory which exists
  471. for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
  472. self.assertTrue(os.path.isabs(d) or d == os.curdir,
  473. "%r is not an absolute path" % d)
  474. self.assertTrue(os.path.isdir(d),
  475. "%r is not a directory" % d)
  476. def test_directory_writable(self):
  477. # gettempdir returns a directory writable by the user
  478. # sneaky: just instantiate a NamedTemporaryFile, which
  479. # defaults to writing into the directory returned by
  480. # gettempdir.
  481. with tempfile.NamedTemporaryFile() as file:
  482. file.write(b"blat")
  483. def test_same_thing(self):
  484. # gettempdir always returns the same object
  485. a = tempfile.gettempdir()
  486. b = tempfile.gettempdir()
  487. c = tempfile.gettempdirb()
  488. self.assertTrue(a is b)
  489. self.assertNotEqual(type(a), type(c))
  490. self.assertEqual(a, os.fsdecode(c))
  491. def test_case_sensitive(self):
  492. # gettempdir should not flatten its case
  493. # even on a case-insensitive file system
  494. case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
  495. _tempdir, tempfile.tempdir = tempfile.tempdir, None
  496. try:
  497. with os_helper.EnvironmentVarGuard() as env:
  498. # Fake the first env var which is checked as a candidate
  499. env["TMPDIR"] = case_sensitive_tempdir
  500. self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
  501. finally:
  502. tempfile.tempdir = _tempdir
  503. os_helper.rmdir(case_sensitive_tempdir)
  504. class TestMkstemp(BaseTestCase):
  505. """Test mkstemp()."""
  506. def do_create(self, dir=None, pre=None, suf=None):
  507. output_type = tempfile._infer_return_type(dir, pre, suf)
  508. if dir is None:
  509. if output_type is str:
  510. dir = tempfile.gettempdir()
  511. else:
  512. dir = tempfile.gettempdirb()
  513. if pre is None:
  514. pre = output_type()
  515. if suf is None:
  516. suf = output_type()
  517. (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
  518. (ndir, nbase) = os.path.split(name)
  519. adir = os.path.abspath(dir)
  520. self.assertEqual(adir, ndir,
  521. "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
  522. try:
  523. self.nameCheck(name, dir, pre, suf)
  524. finally:
  525. os.close(fd)
  526. os.unlink(name)
  527. def test_basic(self):
  528. # mkstemp can create files
  529. self.do_create()
  530. self.do_create(pre="a")
  531. self.do_create(suf="b")
  532. self.do_create(pre="a", suf="b")
  533. self.do_create(pre="aa", suf=".txt")
  534. self.do_create(dir=".")
  535. def test_basic_with_bytes_names(self):
  536. # mkstemp can create files when given name parts all
  537. # specified as bytes.
  538. d = tempfile.gettempdirb()
  539. self.do_create(dir=d, suf=b"")
  540. self.do_create(dir=d, pre=b"a")
  541. self.do_create(dir=d, suf=b"b")
  542. self.do_create(dir=d, pre=b"a", suf=b"b")
  543. self.do_create(dir=d, pre=b"aa", suf=b".txt")
  544. self.do_create(dir=b".")
  545. with self.assertRaises(TypeError):
  546. self.do_create(dir=".", pre=b"aa", suf=b".txt")
  547. with self.assertRaises(TypeError):
  548. self.do_create(dir=b".", pre="aa", suf=b".txt")
  549. with self.assertRaises(TypeError):
  550. self.do_create(dir=b".", pre=b"aa", suf=".txt")
  551. def test_choose_directory(self):
  552. # mkstemp can create directories in a user-selected directory
  553. dir = tempfile.mkdtemp()
  554. try:
  555. self.do_create(dir=dir)
  556. self.do_create(dir=pathlib.Path(dir))
  557. finally:
  558. os.rmdir(dir)
  559. def test_for_tempdir_is_bytes_issue40701_api_warts(self):
  560. orig_tempdir = tempfile.tempdir
  561. self.assertIsInstance(tempfile.tempdir, (str, type(None)))
  562. try:
  563. fd, path = tempfile.mkstemp()
  564. os.close(fd)
  565. os.unlink(path)
  566. self.assertIsInstance(path, str)
  567. tempfile.tempdir = tempfile.gettempdirb()
  568. self.assertIsInstance(tempfile.tempdir, bytes)
  569. self.assertIsInstance(tempfile.gettempdir(), str)
  570. self.assertIsInstance(tempfile.gettempdirb(), bytes)
  571. fd, path = tempfile.mkstemp()
  572. os.close(fd)
  573. os.unlink(path)
  574. self.assertIsInstance(path, bytes)
  575. fd, path = tempfile.mkstemp(suffix='.txt')
  576. os.close(fd)
  577. os.unlink(path)
  578. self.assertIsInstance(path, str)
  579. fd, path = tempfile.mkstemp(prefix='test-temp-')
  580. os.close(fd)
  581. os.unlink(path)
  582. self.assertIsInstance(path, str)
  583. fd, path = tempfile.mkstemp(dir=tempfile.gettempdir())
  584. os.close(fd)
  585. os.unlink(path)
  586. self.assertIsInstance(path, str)
  587. finally:
  588. tempfile.tempdir = orig_tempdir
  589. class TestMkdtemp(TestBadTempdir, BaseTestCase):
  590. """Test mkdtemp()."""
  591. def make_temp(self):
  592. return tempfile.mkdtemp()
  593. def do_create(self, dir=None, pre=None, suf=None):
  594. output_type = tempfile._infer_return_type(dir, pre, suf)
  595. if dir is None:
  596. if output_type is str:
  597. dir = tempfile.gettempdir()
  598. else:
  599. dir = tempfile.gettempdirb()
  600. if pre is None:
  601. pre = output_type()
  602. if suf is None:
  603. suf = output_type()
  604. name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
  605. try:
  606. self.nameCheck(name, dir, pre, suf)
  607. return name
  608. except:
  609. os.rmdir(name)
  610. raise
  611. def test_basic(self):
  612. # mkdtemp can create directories
  613. os.rmdir(self.do_create())
  614. os.rmdir(self.do_create(pre="a"))
  615. os.rmdir(self.do_create(suf="b"))
  616. os.rmdir(self.do_create(pre="a", suf="b"))
  617. os.rmdir(self.do_create(pre="aa", suf=".txt"))
  618. def test_basic_with_bytes_names(self):
  619. # mkdtemp can create directories when given all binary parts
  620. d = tempfile.gettempdirb()
  621. os.rmdir(self.do_create(dir=d))
  622. os.rmdir(self.do_create(dir=d, pre=b"a"))
  623. os.rmdir(self.do_create(dir=d, suf=b"b"))
  624. os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
  625. os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
  626. with self.assertRaises(TypeError):
  627. os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
  628. with self.assertRaises(TypeError):
  629. os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
  630. with self.assertRaises(TypeError):
  631. os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
  632. def test_basic_many(self):
  633. # mkdtemp can create many directories (stochastic)
  634. extant = list(range(TEST_FILES))
  635. try:
  636. for i in extant:
  637. extant[i] = self.do_create(pre="aa")
  638. finally:
  639. for i in extant:
  640. if(isinstance(i, str)):
  641. os.rmdir(i)
  642. def test_choose_directory(self):
  643. # mkdtemp can create directories in a user-selected directory
  644. dir = tempfile.mkdtemp()
  645. try:
  646. os.rmdir(self.do_create(dir=dir))
  647. os.rmdir(self.do_create(dir=pathlib.Path(dir)))
  648. finally:
  649. os.rmdir(dir)
  650. @os_helper.skip_unless_working_chmod
  651. def test_mode(self):
  652. # mkdtemp creates directories with the proper mode
  653. dir = self.do_create()
  654. try:
  655. mode = stat.S_IMODE(os.stat(dir).st_mode)
  656. mode &= 0o777 # Mask off sticky bits inherited from /tmp
  657. expected = 0o700
  658. if sys.platform == 'win32':
  659. # There's no distinction among 'user', 'group' and 'world';
  660. # replicate the 'user' bits.
  661. user = expected >> 6
  662. expected = user * (1 + 8 + 64)
  663. self.assertEqual(mode, expected)
  664. finally:
  665. os.rmdir(dir)
  666. def test_collision_with_existing_file(self):
  667. # mkdtemp tries another name when a file with
  668. # the chosen name already exists
  669. with _inside_empty_temp_dir(), \
  670. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  671. file = tempfile.NamedTemporaryFile(delete=False)
  672. file.close()
  673. self.assertTrue(file.name.endswith('aaa'))
  674. dir = tempfile.mkdtemp()
  675. self.assertTrue(dir.endswith('bbb'))
  676. def test_collision_with_existing_directory(self):
  677. # mkdtemp tries another name when a directory with
  678. # the chosen name already exists
  679. with _inside_empty_temp_dir(), \
  680. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  681. dir1 = tempfile.mkdtemp()
  682. self.assertTrue(dir1.endswith('aaa'))
  683. dir2 = tempfile.mkdtemp()
  684. self.assertTrue(dir2.endswith('bbb'))
  685. def test_for_tempdir_is_bytes_issue40701_api_warts(self):
  686. orig_tempdir = tempfile.tempdir
  687. self.assertIsInstance(tempfile.tempdir, (str, type(None)))
  688. try:
  689. path = tempfile.mkdtemp()
  690. os.rmdir(path)
  691. self.assertIsInstance(path, str)
  692. tempfile.tempdir = tempfile.gettempdirb()
  693. self.assertIsInstance(tempfile.tempdir, bytes)
  694. self.assertIsInstance(tempfile.gettempdir(), str)
  695. self.assertIsInstance(tempfile.gettempdirb(), bytes)
  696. path = tempfile.mkdtemp()
  697. os.rmdir(path)
  698. self.assertIsInstance(path, bytes)
  699. path = tempfile.mkdtemp(suffix='-dir')
  700. os.rmdir(path)
  701. self.assertIsInstance(path, str)
  702. path = tempfile.mkdtemp(prefix='test-mkdtemp-')
  703. os.rmdir(path)
  704. self.assertIsInstance(path, str)
  705. path = tempfile.mkdtemp(dir=tempfile.gettempdir())
  706. os.rmdir(path)
  707. self.assertIsInstance(path, str)
  708. finally:
  709. tempfile.tempdir = orig_tempdir
  710. class TestMktemp(BaseTestCase):
  711. """Test mktemp()."""
  712. # For safety, all use of mktemp must occur in a private directory.
  713. # We must also suppress the RuntimeWarning it generates.
  714. def setUp(self):
  715. self.dir = tempfile.mkdtemp()
  716. super().setUp()
  717. def tearDown(self):
  718. if self.dir:
  719. os.rmdir(self.dir)
  720. self.dir = None
  721. super().tearDown()
  722. class mktemped:
  723. _unlink = os.unlink
  724. _bflags = tempfile._bin_openflags
  725. def __init__(self, dir, pre, suf):
  726. self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
  727. # Create the file. This will raise an exception if it's
  728. # mysteriously appeared in the meanwhile.
  729. os.close(os.open(self.name, self._bflags, 0o600))
  730. def __del__(self):
  731. self._unlink(self.name)
  732. def do_create(self, pre="", suf=""):
  733. file = self.mktemped(self.dir, pre, suf)
  734. self.nameCheck(file.name, self.dir, pre, suf)
  735. return file
  736. def test_basic(self):
  737. # mktemp can choose usable file names
  738. self.do_create()
  739. self.do_create(pre="a")
  740. self.do_create(suf="b")
  741. self.do_create(pre="a", suf="b")
  742. self.do_create(pre="aa", suf=".txt")
  743. def test_many(self):
  744. # mktemp can choose many usable file names (stochastic)
  745. extant = list(range(TEST_FILES))
  746. for i in extant:
  747. extant[i] = self.do_create(pre="aa")
  748. del extant
  749. support.gc_collect() # For PyPy or other GCs.
  750. ## def test_warning(self):
  751. ## # mktemp issues a warning when used
  752. ## warnings.filterwarnings("error",
  753. ## category=RuntimeWarning,
  754. ## message="mktemp")
  755. ## self.assertRaises(RuntimeWarning,
  756. ## tempfile.mktemp, dir=self.dir)
  757. # We test _TemporaryFileWrapper by testing NamedTemporaryFile.
  758. class TestNamedTemporaryFile(BaseTestCase):
  759. """Test NamedTemporaryFile()."""
  760. def do_create(self, dir=None, pre="", suf="", delete=True):
  761. if dir is None:
  762. dir = tempfile.gettempdir()
  763. file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
  764. delete=delete)
  765. self.nameCheck(file.name, dir, pre, suf)
  766. return file
  767. def test_basic(self):
  768. # NamedTemporaryFile can create files
  769. self.do_create()
  770. self.do_create(pre="a")
  771. self.do_create(suf="b")
  772. self.do_create(pre="a", suf="b")
  773. self.do_create(pre="aa", suf=".txt")
  774. def test_method_lookup(self):
  775. # Issue #18879: Looking up a temporary file method should keep it
  776. # alive long enough.
  777. f = self.do_create()
  778. wr = weakref.ref(f)
  779. write = f.write
  780. write2 = f.write
  781. del f
  782. write(b'foo')
  783. del write
  784. write2(b'bar')
  785. del write2
  786. if support.check_impl_detail(cpython=True):
  787. # No reference cycle was created.
  788. self.assertIsNone(wr())
  789. def test_iter(self):
  790. # Issue #23700: getting iterator from a temporary file should keep
  791. # it alive as long as it's being iterated over
  792. lines = [b'spam\n', b'eggs\n', b'beans\n']
  793. def make_file():
  794. f = tempfile.NamedTemporaryFile(mode='w+b')
  795. f.write(b''.join(lines))
  796. f.seek(0)
  797. return f
  798. for i, l in enumerate(make_file()):
  799. self.assertEqual(l, lines[i])
  800. self.assertEqual(i, len(lines) - 1)
  801. def test_creates_named(self):
  802. # NamedTemporaryFile creates files with names
  803. f = tempfile.NamedTemporaryFile()
  804. self.assertTrue(os.path.exists(f.name),
  805. "NamedTemporaryFile %s does not exist" % f.name)
  806. def test_del_on_close(self):
  807. # A NamedTemporaryFile is deleted when closed
  808. dir = tempfile.mkdtemp()
  809. try:
  810. with tempfile.NamedTemporaryFile(dir=dir) as f:
  811. f.write(b'blat')
  812. self.assertEqual(os.listdir(dir), [])
  813. self.assertFalse(os.path.exists(f.name),
  814. "NamedTemporaryFile %s exists after close" % f.name)
  815. finally:
  816. os.rmdir(dir)
  817. def test_dis_del_on_close(self):
  818. # Tests that delete-on-close can be disabled
  819. dir = tempfile.mkdtemp()
  820. tmp = None
  821. try:
  822. f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
  823. tmp = f.name
  824. f.write(b'blat')
  825. f.close()
  826. self.assertTrue(os.path.exists(f.name),
  827. "NamedTemporaryFile %s missing after close" % f.name)
  828. finally:
  829. if tmp is not None:
  830. os.unlink(tmp)
  831. os.rmdir(dir)
  832. def test_multiple_close(self):
  833. # A NamedTemporaryFile can be closed many times without error
  834. f = tempfile.NamedTemporaryFile()
  835. f.write(b'abc\n')
  836. f.close()
  837. f.close()
  838. f.close()
  839. def test_context_manager(self):
  840. # A NamedTemporaryFile can be used as a context manager
  841. with tempfile.NamedTemporaryFile() as f:
  842. self.assertTrue(os.path.exists(f.name))
  843. self.assertFalse(os.path.exists(f.name))
  844. def use_closed():
  845. with f:
  846. pass
  847. self.assertRaises(ValueError, use_closed)
  848. def test_bad_mode(self):
  849. dir = tempfile.mkdtemp()
  850. self.addCleanup(os_helper.rmtree, dir)
  851. with self.assertRaises(ValueError):
  852. tempfile.NamedTemporaryFile(mode='wr', dir=dir)
  853. with self.assertRaises(TypeError):
  854. tempfile.NamedTemporaryFile(mode=2, dir=dir)
  855. self.assertEqual(os.listdir(dir), [])
  856. def test_bad_encoding(self):
  857. dir = tempfile.mkdtemp()
  858. self.addCleanup(os_helper.rmtree, dir)
  859. with self.assertRaises(LookupError):
  860. tempfile.NamedTemporaryFile('w', encoding='bad-encoding', dir=dir)
  861. self.assertEqual(os.listdir(dir), [])
  862. def test_unexpected_error(self):
  863. dir = tempfile.mkdtemp()
  864. self.addCleanup(os_helper.rmtree, dir)
  865. with mock.patch('tempfile._TemporaryFileWrapper') as mock_ntf, \
  866. mock.patch('io.open', mock.mock_open()) as mock_open:
  867. mock_ntf.side_effect = KeyboardInterrupt()
  868. with self.assertRaises(KeyboardInterrupt):
  869. tempfile.NamedTemporaryFile(dir=dir)
  870. mock_open().close.assert_called()
  871. self.assertEqual(os.listdir(dir), [])
  872. # How to test the mode and bufsize parameters?
  873. class TestSpooledTemporaryFile(BaseTestCase):
  874. """Test SpooledTemporaryFile()."""
  875. def do_create(self, max_size=0, dir=None, pre="", suf=""):
  876. if dir is None:
  877. dir = tempfile.gettempdir()
  878. file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
  879. return file
  880. def test_basic(self):
  881. # SpooledTemporaryFile can create files
  882. f = self.do_create()
  883. self.assertFalse(f._rolled)
  884. f = self.do_create(max_size=100, pre="a", suf=".txt")
  885. self.assertFalse(f._rolled)
  886. def test_is_iobase(self):
  887. # SpooledTemporaryFile should implement io.IOBase
  888. self.assertIsInstance(self.do_create(), io.IOBase)
  889. def test_iobase_interface(self):
  890. # SpooledTemporaryFile should implement the io.IOBase interface.
  891. # Ensure it has all the required methods and properties.
  892. iobase_attrs = {
  893. # From IOBase
  894. 'fileno', 'seek', 'truncate', 'close', 'closed', '__enter__',
  895. '__exit__', 'flush', 'isatty', '__iter__', '__next__', 'readable',
  896. 'readline', 'readlines', 'seekable', 'tell', 'writable',
  897. 'writelines',
  898. # From BufferedIOBase (binary mode) and TextIOBase (text mode)
  899. 'detach', 'read', 'read1', 'write', 'readinto', 'readinto1',
  900. 'encoding', 'errors', 'newlines',
  901. }
  902. spooledtempfile_attrs = set(dir(tempfile.SpooledTemporaryFile))
  903. missing_attrs = iobase_attrs - spooledtempfile_attrs
  904. self.assertFalse(
  905. missing_attrs,
  906. 'SpooledTemporaryFile missing attributes from IOBase/BufferedIOBase/TextIOBase'
  907. )
  908. def test_del_on_close(self):
  909. # A SpooledTemporaryFile is deleted when closed
  910. dir = tempfile.mkdtemp()
  911. try:
  912. f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
  913. self.assertFalse(f._rolled)
  914. f.write(b'blat ' * 5)
  915. self.assertTrue(f._rolled)
  916. filename = f.name
  917. f.close()
  918. self.assertEqual(os.listdir(dir), [])
  919. if not isinstance(filename, int):
  920. self.assertFalse(os.path.exists(filename),
  921. "SpooledTemporaryFile %s exists after close" % filename)
  922. finally:
  923. os.rmdir(dir)
  924. def test_del_unrolled_file(self):
  925. # The unrolled SpooledTemporaryFile should raise a ResourceWarning
  926. # when deleted since the file was not explicitly closed.
  927. f = self.do_create(max_size=10)
  928. f.write(b'foo')
  929. self.assertEqual(f.name, None) # Unrolled so no filename/fd
  930. with self.assertWarns(ResourceWarning):
  931. f.__del__()
  932. @unittest.skipIf(
  933. support.is_emscripten, "Emscripten cannot fstat renamed files."
  934. )
  935. def test_del_rolled_file(self):
  936. # The rolled file should be deleted when the SpooledTemporaryFile
  937. # object is deleted. This should raise a ResourceWarning since the file
  938. # was not explicitly closed.
  939. f = self.do_create(max_size=2)
  940. f.write(b'foo')
  941. name = f.name # This is a fd on posix+cygwin, a filename everywhere else
  942. self.assertTrue(os.path.exists(name))
  943. with self.assertWarns(ResourceWarning):
  944. f.__del__()
  945. self.assertFalse(
  946. os.path.exists(name),
  947. "Rolled SpooledTemporaryFile (name=%s) exists after delete" % name
  948. )
  949. def test_rewrite_small(self):
  950. # A SpooledTemporaryFile can be written to multiple within the max_size
  951. f = self.do_create(max_size=30)
  952. self.assertFalse(f._rolled)
  953. for i in range(5):
  954. f.seek(0, 0)
  955. f.write(b'x' * 20)
  956. self.assertFalse(f._rolled)
  957. def test_write_sequential(self):
  958. # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
  959. # over afterward
  960. f = self.do_create(max_size=30)
  961. self.assertFalse(f._rolled)
  962. f.write(b'x' * 20)
  963. self.assertFalse(f._rolled)
  964. f.write(b'x' * 10)
  965. self.assertFalse(f._rolled)
  966. f.write(b'x')
  967. self.assertTrue(f._rolled)
  968. def test_writelines(self):
  969. # Verify writelines with a SpooledTemporaryFile
  970. f = self.do_create()
  971. f.writelines((b'x', b'y', b'z'))
  972. pos = f.seek(0)
  973. self.assertEqual(pos, 0)
  974. buf = f.read()
  975. self.assertEqual(buf, b'xyz')
  976. def test_writelines_sequential(self):
  977. # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
  978. # over afterward
  979. f = self.do_create(max_size=35)
  980. f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
  981. self.assertFalse(f._rolled)
  982. f.write(b'x')
  983. self.assertTrue(f._rolled)
  984. def test_sparse(self):
  985. # A SpooledTemporaryFile that is written late in the file will extend
  986. # when that occurs
  987. f = self.do_create(max_size=30)
  988. self.assertFalse(f._rolled)
  989. pos = f.seek(100, 0)
  990. self.assertEqual(pos, 100)
  991. self.assertFalse(f._rolled)
  992. f.write(b'x')
  993. self.assertTrue(f._rolled)
  994. def test_fileno(self):
  995. # A SpooledTemporaryFile should roll over to a real file on fileno()
  996. f = self.do_create(max_size=30)
  997. self.assertFalse(f._rolled)
  998. self.assertTrue(f.fileno() > 0)
  999. self.assertTrue(f._rolled)
  1000. def test_multiple_close_before_rollover(self):
  1001. # A SpooledTemporaryFile can be closed many times without error
  1002. f = tempfile.SpooledTemporaryFile()
  1003. f.write(b'abc\n')
  1004. self.assertFalse(f._rolled)
  1005. f.close()
  1006. f.close()
  1007. f.close()
  1008. def test_multiple_close_after_rollover(self):
  1009. # A SpooledTemporaryFile can be closed many times without error
  1010. f = tempfile.SpooledTemporaryFile(max_size=1)
  1011. f.write(b'abc\n')
  1012. self.assertTrue(f._rolled)
  1013. f.close()
  1014. f.close()
  1015. f.close()
  1016. def test_bound_methods(self):
  1017. # It should be OK to steal a bound method from a SpooledTemporaryFile
  1018. # and use it independently; when the file rolls over, those bound
  1019. # methods should continue to function
  1020. f = self.do_create(max_size=30)
  1021. read = f.read
  1022. write = f.write
  1023. seek = f.seek
  1024. write(b"a" * 35)
  1025. write(b"b" * 35)
  1026. seek(0, 0)
  1027. self.assertEqual(read(70), b'a'*35 + b'b'*35)
  1028. def test_properties(self):
  1029. f = tempfile.SpooledTemporaryFile(max_size=10)
  1030. f.write(b'x' * 10)
  1031. self.assertFalse(f._rolled)
  1032. self.assertEqual(f.mode, 'w+b')
  1033. self.assertIsNone(f.name)
  1034. with self.assertRaises(AttributeError):
  1035. f.newlines
  1036. with self.assertRaises(AttributeError):
  1037. f.encoding
  1038. with self.assertRaises(AttributeError):
  1039. f.errors
  1040. f.write(b'x')
  1041. self.assertTrue(f._rolled)
  1042. self.assertEqual(f.mode, 'rb+')
  1043. self.assertIsNotNone(f.name)
  1044. with self.assertRaises(AttributeError):
  1045. f.newlines
  1046. with self.assertRaises(AttributeError):
  1047. f.encoding
  1048. with self.assertRaises(AttributeError):
  1049. f.errors
  1050. def test_text_mode(self):
  1051. # Creating a SpooledTemporaryFile with a text mode should produce
  1052. # a file object reading and writing (Unicode) text strings.
  1053. f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
  1054. encoding="utf-8")
  1055. f.write("abc\n")
  1056. f.seek(0)
  1057. self.assertEqual(f.read(), "abc\n")
  1058. f.write("def\n")
  1059. f.seek(0)
  1060. self.assertEqual(f.read(), "abc\ndef\n")
  1061. self.assertFalse(f._rolled)
  1062. self.assertEqual(f.mode, 'w+')
  1063. self.assertIsNone(f.name)
  1064. self.assertEqual(f.newlines, os.linesep)
  1065. self.assertEqual(f.encoding, "utf-8")
  1066. self.assertEqual(f.errors, "strict")
  1067. f.write("xyzzy\n")
  1068. f.seek(0)
  1069. self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
  1070. # Check that Ctrl+Z doesn't truncate the file
  1071. f.write("foo\x1abar\n")
  1072. f.seek(0)
  1073. self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
  1074. self.assertTrue(f._rolled)
  1075. self.assertEqual(f.mode, 'w+')
  1076. self.assertIsNotNone(f.name)
  1077. self.assertEqual(f.newlines, os.linesep)
  1078. self.assertEqual(f.encoding, "utf-8")
  1079. self.assertEqual(f.errors, "strict")
  1080. def test_text_newline_and_encoding(self):
  1081. f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
  1082. newline='', encoding='utf-8',
  1083. errors='ignore')
  1084. f.write("\u039B\r\n")
  1085. f.seek(0)
  1086. self.assertEqual(f.read(), "\u039B\r\n")
  1087. self.assertFalse(f._rolled)
  1088. self.assertEqual(f.mode, 'w+')
  1089. self.assertIsNone(f.name)
  1090. self.assertIsNotNone(f.newlines)
  1091. self.assertEqual(f.encoding, "utf-8")
  1092. self.assertEqual(f.errors, "ignore")
  1093. f.write("\u039C" * 10 + "\r\n")
  1094. f.write("\u039D" * 20)
  1095. f.seek(0)
  1096. self.assertEqual(f.read(),
  1097. "\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
  1098. self.assertTrue(f._rolled)
  1099. self.assertEqual(f.mode, 'w+')
  1100. self.assertIsNotNone(f.name)
  1101. self.assertIsNotNone(f.newlines)
  1102. self.assertEqual(f.encoding, 'utf-8')
  1103. self.assertEqual(f.errors, 'ignore')
  1104. def test_context_manager_before_rollover(self):
  1105. # A SpooledTemporaryFile can be used as a context manager
  1106. with tempfile.SpooledTemporaryFile(max_size=1) as f:
  1107. self.assertFalse(f._rolled)
  1108. self.assertFalse(f.closed)
  1109. self.assertTrue(f.closed)
  1110. def use_closed():
  1111. with f:
  1112. pass
  1113. self.assertRaises(ValueError, use_closed)
  1114. def test_context_manager_during_rollover(self):
  1115. # A SpooledTemporaryFile can be used as a context manager
  1116. with tempfile.SpooledTemporaryFile(max_size=1) as f:
  1117. self.assertFalse(f._rolled)
  1118. f.write(b'abc\n')
  1119. f.flush()
  1120. self.assertTrue(f._rolled)
  1121. self.assertFalse(f.closed)
  1122. self.assertTrue(f.closed)
  1123. def use_closed():
  1124. with f:
  1125. pass
  1126. self.assertRaises(ValueError, use_closed)
  1127. def test_context_manager_after_rollover(self):
  1128. # A SpooledTemporaryFile can be used as a context manager
  1129. f = tempfile.SpooledTemporaryFile(max_size=1)
  1130. f.write(b'abc\n')
  1131. f.flush()
  1132. self.assertTrue(f._rolled)
  1133. with f:
  1134. self.assertFalse(f.closed)
  1135. self.assertTrue(f.closed)
  1136. def use_closed():
  1137. with f:
  1138. pass
  1139. self.assertRaises(ValueError, use_closed)
  1140. @unittest.skipIf(
  1141. support.is_emscripten, "Emscripten cannot fstat renamed files."
  1142. )
  1143. def test_truncate_with_size_parameter(self):
  1144. # A SpooledTemporaryFile can be truncated to zero size
  1145. f = tempfile.SpooledTemporaryFile(max_size=10)
  1146. f.write(b'abcdefg\n')
  1147. f.seek(0)
  1148. f.truncate()
  1149. self.assertFalse(f._rolled)
  1150. self.assertEqual(f._file.getvalue(), b'')
  1151. # A SpooledTemporaryFile can be truncated to a specific size
  1152. f = tempfile.SpooledTemporaryFile(max_size=10)
  1153. f.write(b'abcdefg\n')
  1154. f.truncate(4)
  1155. self.assertFalse(f._rolled)
  1156. self.assertEqual(f._file.getvalue(), b'abcd')
  1157. # A SpooledTemporaryFile rolls over if truncated to large size
  1158. f = tempfile.SpooledTemporaryFile(max_size=10)
  1159. f.write(b'abcdefg\n')
  1160. f.truncate(20)
  1161. self.assertTrue(f._rolled)
  1162. self.assertEqual(os.fstat(f.fileno()).st_size, 20)
  1163. def test_class_getitem(self):
  1164. self.assertIsInstance(tempfile.SpooledTemporaryFile[bytes],
  1165. types.GenericAlias)
  1166. if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
  1167. class TestTemporaryFile(BaseTestCase):
  1168. """Test TemporaryFile()."""
  1169. def test_basic(self):
  1170. # TemporaryFile can create files
  1171. # No point in testing the name params - the file has no name.
  1172. tempfile.TemporaryFile()
  1173. def test_has_no_name(self):
  1174. # TemporaryFile creates files with no names (on this system)
  1175. dir = tempfile.mkdtemp()
  1176. f = tempfile.TemporaryFile(dir=dir)
  1177. f.write(b'blat')
  1178. # Sneaky: because this file has no name, it should not prevent
  1179. # us from removing the directory it was created in.
  1180. try:
  1181. os.rmdir(dir)
  1182. except:
  1183. # cleanup
  1184. f.close()
  1185. os.rmdir(dir)
  1186. raise
  1187. def test_multiple_close(self):
  1188. # A TemporaryFile can be closed many times without error
  1189. f = tempfile.TemporaryFile()
  1190. f.write(b'abc\n')
  1191. f.close()
  1192. f.close()
  1193. f.close()
  1194. # How to test the mode and bufsize parameters?
  1195. def test_mode_and_encoding(self):
  1196. def roundtrip(input, *args, **kwargs):
  1197. with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
  1198. fileobj.write(input)
  1199. fileobj.seek(0)
  1200. self.assertEqual(input, fileobj.read())
  1201. roundtrip(b"1234", "w+b")
  1202. roundtrip("abdc\n", "w+")
  1203. roundtrip("\u039B", "w+", encoding="utf-16")
  1204. roundtrip("foo\r\n", "w+", newline="")
  1205. def test_bad_mode(self):
  1206. dir = tempfile.mkdtemp()
  1207. self.addCleanup(os_helper.rmtree, dir)
  1208. with self.assertRaises(ValueError):
  1209. tempfile.TemporaryFile(mode='wr', dir=dir)
  1210. with self.assertRaises(TypeError):
  1211. tempfile.TemporaryFile(mode=2, dir=dir)
  1212. self.assertEqual(os.listdir(dir), [])
  1213. def test_bad_encoding(self):
  1214. dir = tempfile.mkdtemp()
  1215. self.addCleanup(os_helper.rmtree, dir)
  1216. with self.assertRaises(LookupError):
  1217. tempfile.TemporaryFile('w', encoding='bad-encoding', dir=dir)
  1218. self.assertEqual(os.listdir(dir), [])
  1219. def test_unexpected_error(self):
  1220. dir = tempfile.mkdtemp()
  1221. self.addCleanup(os_helper.rmtree, dir)
  1222. with mock.patch('tempfile._O_TMPFILE_WORKS', False), \
  1223. mock.patch('os.unlink') as mock_unlink, \
  1224. mock.patch('os.open') as mock_open, \
  1225. mock.patch('os.close') as mock_close:
  1226. mock_unlink.side_effect = KeyboardInterrupt()
  1227. with self.assertRaises(KeyboardInterrupt):
  1228. tempfile.TemporaryFile(dir=dir)
  1229. mock_close.assert_called()
  1230. self.assertEqual(os.listdir(dir), [])
  1231. # Helper for test_del_on_shutdown
  1232. class NulledModules:
  1233. def __init__(self, *modules):
  1234. self.refs = [mod.__dict__ for mod in modules]
  1235. self.contents = [ref.copy() for ref in self.refs]
  1236. def __enter__(self):
  1237. for d in self.refs:
  1238. for key in d:
  1239. d[key] = None
  1240. def __exit__(self, *exc_info):
  1241. for d, c in zip(self.refs, self.contents):
  1242. d.clear()
  1243. d.update(c)
  1244. class TestTemporaryDirectory(BaseTestCase):
  1245. """Test TemporaryDirectory()."""
  1246. def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1,
  1247. ignore_cleanup_errors=False):
  1248. if dir is None:
  1249. dir = tempfile.gettempdir()
  1250. tmp = tempfile.TemporaryDirectory(
  1251. dir=dir, prefix=pre, suffix=suf,
  1252. ignore_cleanup_errors=ignore_cleanup_errors)
  1253. self.nameCheck(tmp.name, dir, pre, suf)
  1254. self.do_create2(tmp.name, recurse, dirs, files)
  1255. return tmp
  1256. def do_create2(self, path, recurse=1, dirs=1, files=1):
  1257. # Create subdirectories and some files
  1258. if recurse:
  1259. for i in range(dirs):
  1260. name = os.path.join(path, "dir%d" % i)
  1261. os.mkdir(name)
  1262. self.do_create2(name, recurse-1, dirs, files)
  1263. for i in range(files):
  1264. with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
  1265. f.write(b"Hello world!")
  1266. def test_mkdtemp_failure(self):
  1267. # Check no additional exception if mkdtemp fails
  1268. # Previously would raise AttributeError instead
  1269. # (noted as part of Issue #10188)
  1270. with tempfile.TemporaryDirectory() as nonexistent:
  1271. pass
  1272. with self.assertRaises(FileNotFoundError) as cm:
  1273. tempfile.TemporaryDirectory(dir=nonexistent)
  1274. self.assertEqual(cm.exception.errno, errno.ENOENT)
  1275. def test_explicit_cleanup(self):
  1276. # A TemporaryDirectory is deleted when cleaned up
  1277. dir = tempfile.mkdtemp()
  1278. try:
  1279. d = self.do_create(dir=dir)
  1280. self.assertTrue(os.path.exists(d.name),
  1281. "TemporaryDirectory %s does not exist" % d.name)
  1282. d.cleanup()
  1283. self.assertFalse(os.path.exists(d.name),
  1284. "TemporaryDirectory %s exists after cleanup" % d.name)
  1285. finally:
  1286. os.rmdir(dir)
  1287. def test_explict_cleanup_ignore_errors(self):
  1288. """Test that cleanup doesn't return an error when ignoring them."""
  1289. with tempfile.TemporaryDirectory() as working_dir:
  1290. temp_dir = self.do_create(
  1291. dir=working_dir, ignore_cleanup_errors=True)
  1292. temp_path = pathlib.Path(temp_dir.name)
  1293. self.assertTrue(temp_path.exists(),
  1294. f"TemporaryDirectory {temp_path!s} does not exist")
  1295. with open(temp_path / "a_file.txt", "w+t") as open_file:
  1296. open_file.write("Hello world!\n")
  1297. temp_dir.cleanup()
  1298. self.assertEqual(len(list(temp_path.glob("*"))),
  1299. int(sys.platform.startswith("win")),
  1300. "Unexpected number of files in "
  1301. f"TemporaryDirectory {temp_path!s}")
  1302. self.assertEqual(
  1303. temp_path.exists(),
  1304. sys.platform.startswith("win"),
  1305. f"TemporaryDirectory {temp_path!s} existence state unexpected")
  1306. temp_dir.cleanup()
  1307. self.assertFalse(
  1308. temp_path.exists(),
  1309. f"TemporaryDirectory {temp_path!s} exists after cleanup")
  1310. @os_helper.skip_unless_symlink
  1311. def test_cleanup_with_symlink_to_a_directory(self):
  1312. # cleanup() should not follow symlinks to directories (issue #12464)
  1313. d1 = self.do_create()
  1314. d2 = self.do_create(recurse=0)
  1315. # Symlink d1/foo -> d2
  1316. os.symlink(d2.name, os.path.join(d1.name, "foo"))
  1317. # This call to cleanup() should not follow the "foo" symlink
  1318. d1.cleanup()
  1319. self.assertFalse(os.path.exists(d1.name),
  1320. "TemporaryDirectory %s exists after cleanup" % d1.name)
  1321. self.assertTrue(os.path.exists(d2.name),
  1322. "Directory pointed to by a symlink was deleted")
  1323. self.assertEqual(os.listdir(d2.name), ['test0.txt'],
  1324. "Contents of the directory pointed to by a symlink "
  1325. "were deleted")
  1326. d2.cleanup()
  1327. @support.cpython_only
  1328. def test_del_on_collection(self):
  1329. # A TemporaryDirectory is deleted when garbage collected
  1330. dir = tempfile.mkdtemp()
  1331. try:
  1332. d = self.do_create(dir=dir)
  1333. name = d.name
  1334. del d # Rely on refcounting to invoke __del__
  1335. self.assertFalse(os.path.exists(name),
  1336. "TemporaryDirectory %s exists after __del__" % name)
  1337. finally:
  1338. os.rmdir(dir)
  1339. @support.cpython_only
  1340. def test_del_on_collection_ignore_errors(self):
  1341. """Test that ignoring errors works when TemporaryDirectory is gced."""
  1342. with tempfile.TemporaryDirectory() as working_dir:
  1343. temp_dir = self.do_create(
  1344. dir=working_dir, ignore_cleanup_errors=True)
  1345. temp_path = pathlib.Path(temp_dir.name)
  1346. self.assertTrue(temp_path.exists(),
  1347. f"TemporaryDirectory {temp_path!s} does not exist")
  1348. with open(temp_path / "a_file.txt", "w+t") as open_file:
  1349. open_file.write("Hello world!\n")
  1350. del temp_dir
  1351. self.assertEqual(len(list(temp_path.glob("*"))),
  1352. int(sys.platform.startswith("win")),
  1353. "Unexpected number of files in "
  1354. f"TemporaryDirectory {temp_path!s}")
  1355. self.assertEqual(
  1356. temp_path.exists(),
  1357. sys.platform.startswith("win"),
  1358. f"TemporaryDirectory {temp_path!s} existence state unexpected")
  1359. def test_del_on_shutdown(self):
  1360. # A TemporaryDirectory may be cleaned up during shutdown
  1361. with self.do_create() as dir:
  1362. for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
  1363. code = """if True:
  1364. import builtins
  1365. import os
  1366. import shutil
  1367. import sys
  1368. import tempfile
  1369. import warnings
  1370. tmp = tempfile.TemporaryDirectory(dir={dir!r})
  1371. sys.stdout.buffer.write(tmp.name.encode())
  1372. tmp2 = os.path.join(tmp.name, 'test_dir')
  1373. os.mkdir(tmp2)
  1374. with open(os.path.join(tmp2, "test0.txt"), "w") as f:
  1375. f.write("Hello world!")
  1376. {mod}.tmp = tmp
  1377. warnings.filterwarnings("always", category=ResourceWarning)
  1378. """.format(dir=dir, mod=mod)
  1379. rc, out, err = script_helper.assert_python_ok("-c", code)
  1380. tmp_name = out.decode().strip()
  1381. self.assertFalse(os.path.exists(tmp_name),
  1382. "TemporaryDirectory %s exists after cleanup" % tmp_name)
  1383. err = err.decode('utf-8', 'backslashreplace')
  1384. self.assertNotIn("Exception ", err)
  1385. self.assertIn("ResourceWarning: Implicitly cleaning up", err)
  1386. def test_del_on_shutdown_ignore_errors(self):
  1387. """Test ignoring errors works when a tempdir is gc'ed on shutdown."""
  1388. with tempfile.TemporaryDirectory() as working_dir:
  1389. code = """if True:
  1390. import pathlib
  1391. import sys
  1392. import tempfile
  1393. import warnings
  1394. temp_dir = tempfile.TemporaryDirectory(
  1395. dir={working_dir!r}, ignore_cleanup_errors=True)
  1396. sys.stdout.buffer.write(temp_dir.name.encode())
  1397. temp_dir_2 = pathlib.Path(temp_dir.name) / "test_dir"
  1398. temp_dir_2.mkdir()
  1399. with open(temp_dir_2 / "test0.txt", "w") as test_file:
  1400. test_file.write("Hello world!")
  1401. open_file = open(temp_dir_2 / "open_file.txt", "w")
  1402. open_file.write("Hello world!")
  1403. warnings.filterwarnings("always", category=ResourceWarning)
  1404. """.format(working_dir=working_dir)
  1405. __, out, err = script_helper.assert_python_ok("-c", code)
  1406. temp_path = pathlib.Path(out.decode().strip())
  1407. self.assertEqual(len(list(temp_path.glob("*"))),
  1408. int(sys.platform.startswith("win")),
  1409. "Unexpected number of files in "
  1410. f"TemporaryDirectory {temp_path!s}")
  1411. self.assertEqual(
  1412. temp_path.exists(),
  1413. sys.platform.startswith("win"),
  1414. f"TemporaryDirectory {temp_path!s} existence state unexpected")
  1415. err = err.decode('utf-8', 'backslashreplace')
  1416. self.assertNotIn("Exception", err)
  1417. self.assertNotIn("Error", err)
  1418. self.assertIn("ResourceWarning: Implicitly cleaning up", err)
  1419. def test_exit_on_shutdown(self):
  1420. # Issue #22427
  1421. with self.do_create() as dir:
  1422. code = """if True:
  1423. import sys
  1424. import tempfile
  1425. import warnings
  1426. def generator():
  1427. with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
  1428. yield tmp
  1429. g = generator()
  1430. sys.stdout.buffer.write(next(g).encode())
  1431. warnings.filterwarnings("always", category=ResourceWarning)
  1432. """.format(dir=dir)
  1433. rc, out, err = script_helper.assert_python_ok("-c", code)
  1434. tmp_name = out.decode().strip()
  1435. self.assertFalse(os.path.exists(tmp_name),
  1436. "TemporaryDirectory %s exists after cleanup" % tmp_name)
  1437. err = err.decode('utf-8', 'backslashreplace')
  1438. self.assertNotIn("Exception ", err)
  1439. self.assertIn("ResourceWarning: Implicitly cleaning up", err)
  1440. def test_warnings_on_cleanup(self):
  1441. # ResourceWarning will be triggered by __del__
  1442. with self.do_create() as dir:
  1443. d = self.do_create(dir=dir, recurse=3)
  1444. name = d.name
  1445. # Check for the resource warning
  1446. with warnings_helper.check_warnings(('Implicitly',
  1447. ResourceWarning),
  1448. quiet=False):
  1449. warnings.filterwarnings("always", category=ResourceWarning)
  1450. del d
  1451. support.gc_collect()
  1452. self.assertFalse(os.path.exists(name),
  1453. "TemporaryDirectory %s exists after __del__" % name)
  1454. def test_multiple_close(self):
  1455. # Can be cleaned-up many times without error
  1456. d = self.do_create()
  1457. d.cleanup()
  1458. d.cleanup()
  1459. d.cleanup()
  1460. def test_context_manager(self):
  1461. # Can be used as a context manager
  1462. d = self.do_create()
  1463. with d as name:
  1464. self.assertTrue(os.path.exists(name))
  1465. self.assertEqual(name, d.name)
  1466. self.assertFalse(os.path.exists(name))
  1467. def test_modes(self):
  1468. for mode in range(8):
  1469. mode <<= 6
  1470. with self.subTest(mode=format(mode, '03o')):
  1471. d = self.do_create(recurse=3, dirs=2, files=2)
  1472. with d:
  1473. # Change files and directories mode recursively.
  1474. for root, dirs, files in os.walk(d.name, topdown=False):
  1475. for name in files:
  1476. os.chmod(os.path.join(root, name), mode)
  1477. os.chmod(root, mode)
  1478. d.cleanup()
  1479. self.assertFalse(os.path.exists(d.name))
  1480. @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags')
  1481. def test_flags(self):
  1482. flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
  1483. d = self.do_create(recurse=3, dirs=2, files=2)
  1484. with d:
  1485. # Change files and directories flags recursively.
  1486. for root, dirs, files in os.walk(d.name, topdown=False):
  1487. for name in files:
  1488. os.chflags(os.path.join(root, name), flags)
  1489. os.chflags(root, flags)
  1490. d.cleanup()
  1491. self.assertFalse(os.path.exists(d.name))
  1492. if __name__ == "__main__":
  1493. unittest.main()