test_threading.py 60 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760
  1. """
  2. Tests for the threading module.
  3. """
  4. import test.support
  5. from test.support import threading_helper, requires_subprocess
  6. from test.support import verbose, cpython_only, os_helper
  7. from test.support.import_helper import import_module
  8. from test.support.script_helper import assert_python_ok, assert_python_failure
  9. import random
  10. import sys
  11. import _thread
  12. import threading
  13. import time
  14. import unittest
  15. import weakref
  16. import os
  17. import subprocess
  18. import signal
  19. import textwrap
  20. import traceback
  21. from unittest import mock
  22. from test import lock_tests
  23. from test import support
  24. threading_helper.requires_working_threading(module=True)
  25. # Between fork() and exec(), only async-safe functions are allowed (issues
  26. # #12316 and #11870), and fork() from a worker thread is known to trigger
  27. # problems with some operating systems (issue #3863): skip problematic tests
  28. # on platforms known to behave badly.
  29. platforms_to_skip = ('netbsd5', 'hp-ux11')
  30. # Is Python built with Py_DEBUG macro defined?
  31. Py_DEBUG = hasattr(sys, 'gettotalrefcount')
  32. def restore_default_excepthook(testcase):
  33. testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
  34. threading.excepthook = threading.__excepthook__
  35. # A trivial mutable counter.
  36. class Counter(object):
  37. def __init__(self):
  38. self.value = 0
  39. def inc(self):
  40. self.value += 1
  41. def dec(self):
  42. self.value -= 1
  43. def get(self):
  44. return self.value
  45. class TestThread(threading.Thread):
  46. def __init__(self, name, testcase, sema, mutex, nrunning):
  47. threading.Thread.__init__(self, name=name)
  48. self.testcase = testcase
  49. self.sema = sema
  50. self.mutex = mutex
  51. self.nrunning = nrunning
  52. def run(self):
  53. delay = random.random() / 10000.0
  54. if verbose:
  55. print('task %s will run for %.1f usec' %
  56. (self.name, delay * 1e6))
  57. with self.sema:
  58. with self.mutex:
  59. self.nrunning.inc()
  60. if verbose:
  61. print(self.nrunning.get(), 'tasks are running')
  62. self.testcase.assertLessEqual(self.nrunning.get(), 3)
  63. time.sleep(delay)
  64. if verbose:
  65. print('task', self.name, 'done')
  66. with self.mutex:
  67. self.nrunning.dec()
  68. self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
  69. if verbose:
  70. print('%s is finished. %d tasks are running' %
  71. (self.name, self.nrunning.get()))
  72. class BaseTestCase(unittest.TestCase):
  73. def setUp(self):
  74. self._threads = threading_helper.threading_setup()
  75. def tearDown(self):
  76. threading_helper.threading_cleanup(*self._threads)
  77. test.support.reap_children()
  78. class ThreadTests(BaseTestCase):
  79. @cpython_only
  80. def test_name(self):
  81. def func(): pass
  82. thread = threading.Thread(name="myname1")
  83. self.assertEqual(thread.name, "myname1")
  84. # Convert int name to str
  85. thread = threading.Thread(name=123)
  86. self.assertEqual(thread.name, "123")
  87. # target name is ignored if name is specified
  88. thread = threading.Thread(target=func, name="myname2")
  89. self.assertEqual(thread.name, "myname2")
  90. with mock.patch.object(threading, '_counter', return_value=2):
  91. thread = threading.Thread(name="")
  92. self.assertEqual(thread.name, "Thread-2")
  93. with mock.patch.object(threading, '_counter', return_value=3):
  94. thread = threading.Thread()
  95. self.assertEqual(thread.name, "Thread-3")
  96. with mock.patch.object(threading, '_counter', return_value=5):
  97. thread = threading.Thread(target=func)
  98. self.assertEqual(thread.name, "Thread-5 (func)")
  99. def test_args_argument(self):
  100. # bpo-45735: Using list or tuple as *args* in constructor could
  101. # achieve the same effect.
  102. num_list = [1]
  103. num_tuple = (1,)
  104. str_list = ["str"]
  105. str_tuple = ("str",)
  106. list_in_tuple = ([1],)
  107. tuple_in_list = [(1,)]
  108. test_cases = (
  109. (num_list, lambda arg: self.assertEqual(arg, 1)),
  110. (num_tuple, lambda arg: self.assertEqual(arg, 1)),
  111. (str_list, lambda arg: self.assertEqual(arg, "str")),
  112. (str_tuple, lambda arg: self.assertEqual(arg, "str")),
  113. (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
  114. (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
  115. )
  116. for args, target in test_cases:
  117. with self.subTest(target=target, args=args):
  118. t = threading.Thread(target=target, args=args)
  119. t.start()
  120. t.join()
  121. @cpython_only
  122. def test_disallow_instantiation(self):
  123. # Ensure that the type disallows instantiation (bpo-43916)
  124. lock = threading.Lock()
  125. test.support.check_disallow_instantiation(self, type(lock))
  126. # Create a bunch of threads, let each do some work, wait until all are
  127. # done.
  128. def test_various_ops(self):
  129. # This takes about n/3 seconds to run (about n/3 clumps of tasks,
  130. # times about 1 second per clump).
  131. NUMTASKS = 10
  132. # no more than 3 of the 10 can run at once
  133. sema = threading.BoundedSemaphore(value=3)
  134. mutex = threading.RLock()
  135. numrunning = Counter()
  136. threads = []
  137. for i in range(NUMTASKS):
  138. t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
  139. threads.append(t)
  140. self.assertIsNone(t.ident)
  141. self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
  142. t.start()
  143. if hasattr(threading, 'get_native_id'):
  144. native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
  145. self.assertNotIn(None, native_ids)
  146. self.assertEqual(len(native_ids), NUMTASKS + 1)
  147. if verbose:
  148. print('waiting for all tasks to complete')
  149. for t in threads:
  150. t.join()
  151. self.assertFalse(t.is_alive())
  152. self.assertNotEqual(t.ident, 0)
  153. self.assertIsNotNone(t.ident)
  154. self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
  155. if verbose:
  156. print('all tasks done')
  157. self.assertEqual(numrunning.get(), 0)
  158. def test_ident_of_no_threading_threads(self):
  159. # The ident still must work for the main thread and dummy threads.
  160. self.assertIsNotNone(threading.current_thread().ident)
  161. def f():
  162. ident.append(threading.current_thread().ident)
  163. done.set()
  164. done = threading.Event()
  165. ident = []
  166. with threading_helper.wait_threads_exit():
  167. tid = _thread.start_new_thread(f, ())
  168. done.wait()
  169. self.assertEqual(ident[0], tid)
  170. # Kill the "immortal" _DummyThread
  171. del threading._active[ident[0]]
  172. # run with a small(ish) thread stack size (256 KiB)
  173. def test_various_ops_small_stack(self):
  174. if verbose:
  175. print('with 256 KiB thread stack size...')
  176. try:
  177. threading.stack_size(262144)
  178. except _thread.error:
  179. raise unittest.SkipTest(
  180. 'platform does not support changing thread stack size')
  181. self.test_various_ops()
  182. threading.stack_size(0)
  183. # run with a large thread stack size (1 MiB)
  184. def test_various_ops_large_stack(self):
  185. if verbose:
  186. print('with 1 MiB thread stack size...')
  187. try:
  188. threading.stack_size(0x100000)
  189. except _thread.error:
  190. raise unittest.SkipTest(
  191. 'platform does not support changing thread stack size')
  192. self.test_various_ops()
  193. threading.stack_size(0)
  194. def test_foreign_thread(self):
  195. # Check that a "foreign" thread can use the threading module.
  196. def f(mutex):
  197. # Calling current_thread() forces an entry for the foreign
  198. # thread to get made in the threading._active map.
  199. threading.current_thread()
  200. mutex.release()
  201. mutex = threading.Lock()
  202. mutex.acquire()
  203. with threading_helper.wait_threads_exit():
  204. tid = _thread.start_new_thread(f, (mutex,))
  205. # Wait for the thread to finish.
  206. mutex.acquire()
  207. self.assertIn(tid, threading._active)
  208. self.assertIsInstance(threading._active[tid], threading._DummyThread)
  209. #Issue 29376
  210. self.assertTrue(threading._active[tid].is_alive())
  211. self.assertRegex(repr(threading._active[tid]), '_DummyThread')
  212. del threading._active[tid]
  213. # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
  214. # exposed at the Python level. This test relies on ctypes to get at it.
  215. def test_PyThreadState_SetAsyncExc(self):
  216. ctypes = import_module("ctypes")
  217. set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
  218. set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
  219. class AsyncExc(Exception):
  220. pass
  221. exception = ctypes.py_object(AsyncExc)
  222. # First check it works when setting the exception from the same thread.
  223. tid = threading.get_ident()
  224. self.assertIsInstance(tid, int)
  225. self.assertGreater(tid, 0)
  226. try:
  227. result = set_async_exc(tid, exception)
  228. # The exception is async, so we might have to keep the VM busy until
  229. # it notices.
  230. while True:
  231. pass
  232. except AsyncExc:
  233. pass
  234. else:
  235. # This code is unreachable but it reflects the intent. If we wanted
  236. # to be smarter the above loop wouldn't be infinite.
  237. self.fail("AsyncExc not raised")
  238. try:
  239. self.assertEqual(result, 1) # one thread state modified
  240. except UnboundLocalError:
  241. # The exception was raised too quickly for us to get the result.
  242. pass
  243. # `worker_started` is set by the thread when it's inside a try/except
  244. # block waiting to catch the asynchronously set AsyncExc exception.
  245. # `worker_saw_exception` is set by the thread upon catching that
  246. # exception.
  247. worker_started = threading.Event()
  248. worker_saw_exception = threading.Event()
  249. class Worker(threading.Thread):
  250. def run(self):
  251. self.id = threading.get_ident()
  252. self.finished = False
  253. try:
  254. while True:
  255. worker_started.set()
  256. time.sleep(0.1)
  257. except AsyncExc:
  258. self.finished = True
  259. worker_saw_exception.set()
  260. t = Worker()
  261. t.daemon = True # so if this fails, we don't hang Python at shutdown
  262. t.start()
  263. if verbose:
  264. print(" started worker thread")
  265. # Try a thread id that doesn't make sense.
  266. if verbose:
  267. print(" trying nonsensical thread id")
  268. result = set_async_exc(-1, exception)
  269. self.assertEqual(result, 0) # no thread states modified
  270. # Now raise an exception in the worker thread.
  271. if verbose:
  272. print(" waiting for worker thread to get started")
  273. ret = worker_started.wait()
  274. self.assertTrue(ret)
  275. if verbose:
  276. print(" verifying worker hasn't exited")
  277. self.assertFalse(t.finished)
  278. if verbose:
  279. print(" attempting to raise asynch exception in worker")
  280. result = set_async_exc(t.id, exception)
  281. self.assertEqual(result, 1) # one thread state modified
  282. if verbose:
  283. print(" waiting for worker to say it caught the exception")
  284. worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
  285. self.assertTrue(t.finished)
  286. if verbose:
  287. print(" all OK -- joining worker")
  288. if t.finished:
  289. t.join()
  290. # else the thread is still running, and we have no way to kill it
  291. def test_limbo_cleanup(self):
  292. # Issue 7481: Failure to start thread should cleanup the limbo map.
  293. def fail_new_thread(*args):
  294. raise threading.ThreadError()
  295. _start_new_thread = threading._start_new_thread
  296. threading._start_new_thread = fail_new_thread
  297. try:
  298. t = threading.Thread(target=lambda: None)
  299. self.assertRaises(threading.ThreadError, t.start)
  300. self.assertFalse(
  301. t in threading._limbo,
  302. "Failed to cleanup _limbo map on failure of Thread.start().")
  303. finally:
  304. threading._start_new_thread = _start_new_thread
  305. def test_finalize_running_thread(self):
  306. # Issue 1402: the PyGILState_Ensure / _Release functions may be called
  307. # very late on python exit: on deallocation of a running thread for
  308. # example.
  309. import_module("ctypes")
  310. rc, out, err = assert_python_failure("-c", """if 1:
  311. import ctypes, sys, time, _thread
  312. # This lock is used as a simple event variable.
  313. ready = _thread.allocate_lock()
  314. ready.acquire()
  315. # Module globals are cleared before __del__ is run
  316. # So we save the functions in class dict
  317. class C:
  318. ensure = ctypes.pythonapi.PyGILState_Ensure
  319. release = ctypes.pythonapi.PyGILState_Release
  320. def __del__(self):
  321. state = self.ensure()
  322. self.release(state)
  323. def waitingThread():
  324. x = C()
  325. ready.release()
  326. time.sleep(100)
  327. _thread.start_new_thread(waitingThread, ())
  328. ready.acquire() # Be sure the other thread is waiting.
  329. sys.exit(42)
  330. """)
  331. self.assertEqual(rc, 42)
  332. def test_finalize_with_trace(self):
  333. # Issue1733757
  334. # Avoid a deadlock when sys.settrace steps into threading._shutdown
  335. assert_python_ok("-c", """if 1:
  336. import sys, threading
  337. # A deadlock-killer, to prevent the
  338. # testsuite to hang forever
  339. def killer():
  340. import os, time
  341. time.sleep(2)
  342. print('program blocked; aborting')
  343. os._exit(2)
  344. t = threading.Thread(target=killer)
  345. t.daemon = True
  346. t.start()
  347. # This is the trace function
  348. def func(frame, event, arg):
  349. threading.current_thread()
  350. return func
  351. sys.settrace(func)
  352. """)
  353. def test_join_nondaemon_on_shutdown(self):
  354. # Issue 1722344
  355. # Raising SystemExit skipped threading._shutdown
  356. rc, out, err = assert_python_ok("-c", """if 1:
  357. import threading
  358. from time import sleep
  359. def child():
  360. sleep(1)
  361. # As a non-daemon thread we SHOULD wake up and nothing
  362. # should be torn down yet
  363. print("Woke up, sleep function is:", sleep)
  364. threading.Thread(target=child).start()
  365. raise SystemExit
  366. """)
  367. self.assertEqual(out.strip(),
  368. b"Woke up, sleep function is: <built-in function sleep>")
  369. self.assertEqual(err, b"")
  370. def test_enumerate_after_join(self):
  371. # Try hard to trigger #1703448: a thread is still returned in
  372. # threading.enumerate() after it has been join()ed.
  373. enum = threading.enumerate
  374. old_interval = sys.getswitchinterval()
  375. try:
  376. for i in range(1, 100):
  377. sys.setswitchinterval(i * 0.0002)
  378. t = threading.Thread(target=lambda: None)
  379. t.start()
  380. t.join()
  381. l = enum()
  382. self.assertNotIn(t, l,
  383. "#1703448 triggered after %d trials: %s" % (i, l))
  384. finally:
  385. sys.setswitchinterval(old_interval)
  386. def test_no_refcycle_through_target(self):
  387. class RunSelfFunction(object):
  388. def __init__(self, should_raise):
  389. # The links in this refcycle from Thread back to self
  390. # should be cleaned up when the thread completes.
  391. self.should_raise = should_raise
  392. self.thread = threading.Thread(target=self._run,
  393. args=(self,),
  394. kwargs={'yet_another':self})
  395. self.thread.start()
  396. def _run(self, other_ref, yet_another):
  397. if self.should_raise:
  398. raise SystemExit
  399. restore_default_excepthook(self)
  400. cyclic_object = RunSelfFunction(should_raise=False)
  401. weak_cyclic_object = weakref.ref(cyclic_object)
  402. cyclic_object.thread.join()
  403. del cyclic_object
  404. self.assertIsNone(weak_cyclic_object(),
  405. msg=('%d references still around' %
  406. sys.getrefcount(weak_cyclic_object())))
  407. raising_cyclic_object = RunSelfFunction(should_raise=True)
  408. weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
  409. raising_cyclic_object.thread.join()
  410. del raising_cyclic_object
  411. self.assertIsNone(weak_raising_cyclic_object(),
  412. msg=('%d references still around' %
  413. sys.getrefcount(weak_raising_cyclic_object())))
  414. def test_old_threading_api(self):
  415. # Just a quick sanity check to make sure the old method names are
  416. # still present
  417. t = threading.Thread()
  418. with self.assertWarnsRegex(DeprecationWarning,
  419. r'get the daemon attribute'):
  420. t.isDaemon()
  421. with self.assertWarnsRegex(DeprecationWarning,
  422. r'set the daemon attribute'):
  423. t.setDaemon(True)
  424. with self.assertWarnsRegex(DeprecationWarning,
  425. r'get the name attribute'):
  426. t.getName()
  427. with self.assertWarnsRegex(DeprecationWarning,
  428. r'set the name attribute'):
  429. t.setName("name")
  430. e = threading.Event()
  431. with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
  432. e.isSet()
  433. cond = threading.Condition()
  434. cond.acquire()
  435. with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
  436. cond.notifyAll()
  437. with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
  438. threading.activeCount()
  439. with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
  440. threading.currentThread()
  441. def test_repr_daemon(self):
  442. t = threading.Thread()
  443. self.assertNotIn('daemon', repr(t))
  444. t.daemon = True
  445. self.assertIn('daemon', repr(t))
  446. def test_daemon_param(self):
  447. t = threading.Thread()
  448. self.assertFalse(t.daemon)
  449. t = threading.Thread(daemon=False)
  450. self.assertFalse(t.daemon)
  451. t = threading.Thread(daemon=True)
  452. self.assertTrue(t.daemon)
  453. @support.requires_fork()
  454. def test_fork_at_exit(self):
  455. # bpo-42350: Calling os.fork() after threading._shutdown() must
  456. # not log an error.
  457. code = textwrap.dedent("""
  458. import atexit
  459. import os
  460. import sys
  461. from test.support import wait_process
  462. # Import the threading module to register its "at fork" callback
  463. import threading
  464. def exit_handler():
  465. pid = os.fork()
  466. if not pid:
  467. print("child process ok", file=sys.stderr, flush=True)
  468. # child process
  469. else:
  470. wait_process(pid, exitcode=0)
  471. # exit_handler() will be called after threading._shutdown()
  472. atexit.register(exit_handler)
  473. """)
  474. _, out, err = assert_python_ok("-c", code)
  475. self.assertEqual(out, b'')
  476. self.assertEqual(err.rstrip(), b'child process ok')
  477. @support.requires_fork()
  478. def test_dummy_thread_after_fork(self):
  479. # Issue #14308: a dummy thread in the active list doesn't mess up
  480. # the after-fork mechanism.
  481. code = """if 1:
  482. import _thread, threading, os, time
  483. def background_thread(evt):
  484. # Creates and registers the _DummyThread instance
  485. threading.current_thread()
  486. evt.set()
  487. time.sleep(10)
  488. evt = threading.Event()
  489. _thread.start_new_thread(background_thread, (evt,))
  490. evt.wait()
  491. assert threading.active_count() == 2, threading.active_count()
  492. if os.fork() == 0:
  493. assert threading.active_count() == 1, threading.active_count()
  494. os._exit(0)
  495. else:
  496. os.wait()
  497. """
  498. _, out, err = assert_python_ok("-c", code)
  499. self.assertEqual(out, b'')
  500. self.assertEqual(err, b'')
  501. @support.requires_fork()
  502. def test_is_alive_after_fork(self):
  503. # Try hard to trigger #18418: is_alive() could sometimes be True on
  504. # threads that vanished after a fork.
  505. old_interval = sys.getswitchinterval()
  506. self.addCleanup(sys.setswitchinterval, old_interval)
  507. # Make the bug more likely to manifest.
  508. test.support.setswitchinterval(1e-6)
  509. for i in range(20):
  510. t = threading.Thread(target=lambda: None)
  511. t.start()
  512. pid = os.fork()
  513. if pid == 0:
  514. os._exit(11 if t.is_alive() else 10)
  515. else:
  516. t.join()
  517. support.wait_process(pid, exitcode=10)
  518. def test_main_thread(self):
  519. main = threading.main_thread()
  520. self.assertEqual(main.name, 'MainThread')
  521. self.assertEqual(main.ident, threading.current_thread().ident)
  522. self.assertEqual(main.ident, threading.get_ident())
  523. def f():
  524. self.assertNotEqual(threading.main_thread().ident,
  525. threading.current_thread().ident)
  526. th = threading.Thread(target=f)
  527. th.start()
  528. th.join()
  529. @support.requires_fork()
  530. @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
  531. def test_main_thread_after_fork(self):
  532. code = """if 1:
  533. import os, threading
  534. from test import support
  535. pid = os.fork()
  536. if pid == 0:
  537. main = threading.main_thread()
  538. print(main.name)
  539. print(main.ident == threading.current_thread().ident)
  540. print(main.ident == threading.get_ident())
  541. else:
  542. support.wait_process(pid, exitcode=0)
  543. """
  544. _, out, err = assert_python_ok("-c", code)
  545. data = out.decode().replace('\r', '')
  546. self.assertEqual(err, b"")
  547. self.assertEqual(data, "MainThread\nTrue\nTrue\n")
  548. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  549. @support.requires_fork()
  550. @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
  551. def test_main_thread_after_fork_from_nonmain_thread(self):
  552. code = """if 1:
  553. import os, threading, sys
  554. from test import support
  555. def func():
  556. pid = os.fork()
  557. if pid == 0:
  558. main = threading.main_thread()
  559. print(main.name)
  560. print(main.ident == threading.current_thread().ident)
  561. print(main.ident == threading.get_ident())
  562. # stdout is fully buffered because not a tty,
  563. # we have to flush before exit.
  564. sys.stdout.flush()
  565. else:
  566. support.wait_process(pid, exitcode=0)
  567. th = threading.Thread(target=func)
  568. th.start()
  569. th.join()
  570. """
  571. _, out, err = assert_python_ok("-c", code)
  572. data = out.decode().replace('\r', '')
  573. self.assertEqual(err, b"")
  574. self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
  575. def test_main_thread_during_shutdown(self):
  576. # bpo-31516: current_thread() should still point to the main thread
  577. # at shutdown
  578. code = """if 1:
  579. import gc, threading
  580. main_thread = threading.current_thread()
  581. assert main_thread is threading.main_thread() # sanity check
  582. class RefCycle:
  583. def __init__(self):
  584. self.cycle = self
  585. def __del__(self):
  586. print("GC:",
  587. threading.current_thread() is main_thread,
  588. threading.main_thread() is main_thread,
  589. threading.enumerate() == [main_thread])
  590. RefCycle()
  591. gc.collect() # sanity check
  592. x = RefCycle()
  593. """
  594. _, out, err = assert_python_ok("-c", code)
  595. data = out.decode()
  596. self.assertEqual(err, b"")
  597. self.assertEqual(data.splitlines(),
  598. ["GC: True True True"] * 2)
  599. def test_finalization_shutdown(self):
  600. # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
  601. # until Python thread states of all non-daemon threads get deleted.
  602. #
  603. # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
  604. # test the finalization of the main interpreter.
  605. code = """if 1:
  606. import os
  607. import threading
  608. import time
  609. import random
  610. def random_sleep():
  611. seconds = random.random() * 0.010
  612. time.sleep(seconds)
  613. class Sleeper:
  614. def __del__(self):
  615. random_sleep()
  616. tls = threading.local()
  617. def f():
  618. # Sleep a bit so that the thread is still running when
  619. # Py_Finalize() is called.
  620. random_sleep()
  621. tls.x = Sleeper()
  622. random_sleep()
  623. threading.Thread(target=f).start()
  624. random_sleep()
  625. """
  626. rc, out, err = assert_python_ok("-c", code)
  627. self.assertEqual(err, b"")
  628. def test_tstate_lock(self):
  629. # Test an implementation detail of Thread objects.
  630. started = _thread.allocate_lock()
  631. finish = _thread.allocate_lock()
  632. started.acquire()
  633. finish.acquire()
  634. def f():
  635. started.release()
  636. finish.acquire()
  637. time.sleep(0.01)
  638. # The tstate lock is None until the thread is started
  639. t = threading.Thread(target=f)
  640. self.assertIs(t._tstate_lock, None)
  641. t.start()
  642. started.acquire()
  643. self.assertTrue(t.is_alive())
  644. # The tstate lock can't be acquired when the thread is running
  645. # (or suspended).
  646. tstate_lock = t._tstate_lock
  647. self.assertFalse(tstate_lock.acquire(timeout=0), False)
  648. finish.release()
  649. # When the thread ends, the state_lock can be successfully
  650. # acquired.
  651. self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
  652. # But is_alive() is still True: we hold _tstate_lock now, which
  653. # prevents is_alive() from knowing the thread's end-of-life C code
  654. # is done.
  655. self.assertTrue(t.is_alive())
  656. # Let is_alive() find out the C code is done.
  657. tstate_lock.release()
  658. self.assertFalse(t.is_alive())
  659. # And verify the thread disposed of _tstate_lock.
  660. self.assertIsNone(t._tstate_lock)
  661. t.join()
  662. def test_repr_stopped(self):
  663. # Verify that "stopped" shows up in repr(Thread) appropriately.
  664. started = _thread.allocate_lock()
  665. finish = _thread.allocate_lock()
  666. started.acquire()
  667. finish.acquire()
  668. def f():
  669. started.release()
  670. finish.acquire()
  671. t = threading.Thread(target=f)
  672. t.start()
  673. started.acquire()
  674. self.assertIn("started", repr(t))
  675. finish.release()
  676. # "stopped" should appear in the repr in a reasonable amount of time.
  677. # Implementation detail: as of this writing, that's trivially true
  678. # if .join() is called, and almost trivially true if .is_alive() is
  679. # called. The detail we're testing here is that "stopped" shows up
  680. # "all on its own".
  681. LOOKING_FOR = "stopped"
  682. for i in range(500):
  683. if LOOKING_FOR in repr(t):
  684. break
  685. time.sleep(0.01)
  686. self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
  687. t.join()
  688. def test_BoundedSemaphore_limit(self):
  689. # BoundedSemaphore should raise ValueError if released too often.
  690. for limit in range(1, 10):
  691. bs = threading.BoundedSemaphore(limit)
  692. threads = [threading.Thread(target=bs.acquire)
  693. for _ in range(limit)]
  694. for t in threads:
  695. t.start()
  696. for t in threads:
  697. t.join()
  698. threads = [threading.Thread(target=bs.release)
  699. for _ in range(limit)]
  700. for t in threads:
  701. t.start()
  702. for t in threads:
  703. t.join()
  704. self.assertRaises(ValueError, bs.release)
  705. @cpython_only
  706. def test_frame_tstate_tracing(self):
  707. # Issue #14432: Crash when a generator is created in a C thread that is
  708. # destroyed while the generator is still used. The issue was that a
  709. # generator contains a frame, and the frame kept a reference to the
  710. # Python state of the destroyed C thread. The crash occurs when a trace
  711. # function is setup.
  712. def noop_trace(frame, event, arg):
  713. # no operation
  714. return noop_trace
  715. def generator():
  716. while 1:
  717. yield "generator"
  718. def callback():
  719. if callback.gen is None:
  720. callback.gen = generator()
  721. return next(callback.gen)
  722. callback.gen = None
  723. old_trace = sys.gettrace()
  724. sys.settrace(noop_trace)
  725. try:
  726. # Install a trace function
  727. threading.settrace(noop_trace)
  728. # Create a generator in a C thread which exits after the call
  729. import _testcapi
  730. _testcapi.call_in_temporary_c_thread(callback)
  731. # Call the generator in a different Python thread, check that the
  732. # generator didn't keep a reference to the destroyed thread state
  733. for test in range(3):
  734. # The trace function is still called here
  735. callback()
  736. finally:
  737. sys.settrace(old_trace)
  738. def test_gettrace(self):
  739. def noop_trace(frame, event, arg):
  740. # no operation
  741. return noop_trace
  742. old_trace = threading.gettrace()
  743. try:
  744. threading.settrace(noop_trace)
  745. trace_func = threading.gettrace()
  746. self.assertEqual(noop_trace,trace_func)
  747. finally:
  748. threading.settrace(old_trace)
  749. def test_getprofile(self):
  750. def fn(*args): pass
  751. old_profile = threading.getprofile()
  752. try:
  753. threading.setprofile(fn)
  754. self.assertEqual(fn, threading.getprofile())
  755. finally:
  756. threading.setprofile(old_profile)
  757. @cpython_only
  758. def test_shutdown_locks(self):
  759. for daemon in (False, True):
  760. with self.subTest(daemon=daemon):
  761. event = threading.Event()
  762. thread = threading.Thread(target=event.wait, daemon=daemon)
  763. # Thread.start() must add lock to _shutdown_locks,
  764. # but only for non-daemon thread
  765. thread.start()
  766. tstate_lock = thread._tstate_lock
  767. if not daemon:
  768. self.assertIn(tstate_lock, threading._shutdown_locks)
  769. else:
  770. self.assertNotIn(tstate_lock, threading._shutdown_locks)
  771. # unblock the thread and join it
  772. event.set()
  773. thread.join()
  774. # Thread._stop() must remove tstate_lock from _shutdown_locks.
  775. # Daemon threads must never add it to _shutdown_locks.
  776. self.assertNotIn(tstate_lock, threading._shutdown_locks)
  777. def test_locals_at_exit(self):
  778. # bpo-19466: thread locals must not be deleted before destructors
  779. # are called
  780. rc, out, err = assert_python_ok("-c", """if 1:
  781. import threading
  782. class Atexit:
  783. def __del__(self):
  784. print("thread_dict.atexit = %r" % thread_dict.atexit)
  785. thread_dict = threading.local()
  786. thread_dict.atexit = "value"
  787. atexit = Atexit()
  788. """)
  789. self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
  790. def test_boolean_target(self):
  791. # bpo-41149: A thread that had a boolean value of False would not
  792. # run, regardless of whether it was callable. The correct behaviour
  793. # is for a thread to do nothing if its target is None, and to call
  794. # the target otherwise.
  795. class BooleanTarget(object):
  796. def __init__(self):
  797. self.ran = False
  798. def __bool__(self):
  799. return False
  800. def __call__(self):
  801. self.ran = True
  802. target = BooleanTarget()
  803. thread = threading.Thread(target=target)
  804. thread.start()
  805. thread.join()
  806. self.assertTrue(target.ran)
  807. def test_leak_without_join(self):
  808. # bpo-37788: Test that a thread which is not joined explicitly
  809. # does not leak. Test written for reference leak checks.
  810. def noop(): pass
  811. with threading_helper.wait_threads_exit():
  812. threading.Thread(target=noop).start()
  813. # Thread.join() is not called
  814. @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)')
  815. def test_debug_deprecation(self):
  816. # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated
  817. rc, out, err = assert_python_ok("-Wdefault", "-c", "pass",
  818. PYTHONTHREADDEBUG="1")
  819. msg = (b'DeprecationWarning: The threading debug '
  820. b'(PYTHONTHREADDEBUG environment variable) '
  821. b'is deprecated and will be removed in Python 3.12')
  822. self.assertIn(msg, err)
  823. def test_import_from_another_thread(self):
  824. # bpo-1596321: If the threading module is first import from a thread
  825. # different than the main thread, threading._shutdown() must handle
  826. # this case without logging an error at Python exit.
  827. code = textwrap.dedent('''
  828. import _thread
  829. import sys
  830. event = _thread.allocate_lock()
  831. event.acquire()
  832. def import_threading():
  833. import threading
  834. event.release()
  835. if 'threading' in sys.modules:
  836. raise Exception('threading is already imported')
  837. _thread.start_new_thread(import_threading, ())
  838. # wait until the threading module is imported
  839. event.acquire()
  840. event.release()
  841. if 'threading' not in sys.modules:
  842. raise Exception('threading is not imported')
  843. # don't wait until the thread completes
  844. ''')
  845. rc, out, err = assert_python_ok("-c", code)
  846. self.assertEqual(out, b'')
  847. self.assertEqual(err, b'')
  848. class ThreadJoinOnShutdown(BaseTestCase):
  849. def _run_and_join(self, script):
  850. script = """if 1:
  851. import sys, os, time, threading
  852. # a thread, which waits for the main program to terminate
  853. def joiningfunc(mainthread):
  854. mainthread.join()
  855. print('end of thread')
  856. # stdout is fully buffered because not a tty, we have to flush
  857. # before exit.
  858. sys.stdout.flush()
  859. \n""" + script
  860. rc, out, err = assert_python_ok("-c", script)
  861. data = out.decode().replace('\r', '')
  862. self.assertEqual(data, "end of main\nend of thread\n")
  863. def test_1_join_on_shutdown(self):
  864. # The usual case: on exit, wait for a non-daemon thread
  865. script = """if 1:
  866. import os
  867. t = threading.Thread(target=joiningfunc,
  868. args=(threading.current_thread(),))
  869. t.start()
  870. time.sleep(0.1)
  871. print('end of main')
  872. """
  873. self._run_and_join(script)
  874. @support.requires_fork()
  875. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  876. def test_2_join_in_forked_process(self):
  877. # Like the test above, but from a forked interpreter
  878. script = """if 1:
  879. from test import support
  880. childpid = os.fork()
  881. if childpid != 0:
  882. # parent process
  883. support.wait_process(childpid, exitcode=0)
  884. sys.exit(0)
  885. # child process
  886. t = threading.Thread(target=joiningfunc,
  887. args=(threading.current_thread(),))
  888. t.start()
  889. print('end of main')
  890. """
  891. self._run_and_join(script)
  892. @support.requires_fork()
  893. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  894. def test_3_join_in_forked_from_thread(self):
  895. # Like the test above, but fork() was called from a worker thread
  896. # In the forked process, the main Thread object must be marked as stopped.
  897. script = """if 1:
  898. from test import support
  899. main_thread = threading.current_thread()
  900. def worker():
  901. childpid = os.fork()
  902. if childpid != 0:
  903. # parent process
  904. support.wait_process(childpid, exitcode=0)
  905. sys.exit(0)
  906. # child process
  907. t = threading.Thread(target=joiningfunc,
  908. args=(main_thread,))
  909. print('end of main')
  910. t.start()
  911. t.join() # Should not block: main_thread is already stopped
  912. w = threading.Thread(target=worker)
  913. w.start()
  914. """
  915. self._run_and_join(script)
  916. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  917. def test_4_daemon_threads(self):
  918. # Check that a daemon thread cannot crash the interpreter on shutdown
  919. # by manipulating internal structures that are being disposed of in
  920. # the main thread.
  921. script = """if True:
  922. import os
  923. import random
  924. import sys
  925. import time
  926. import threading
  927. thread_has_run = set()
  928. def random_io():
  929. '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
  930. import test.test_threading as mod
  931. while True:
  932. with open(mod.__file__, 'rb') as in_f:
  933. stuff = in_f.read(200)
  934. with open(os.devnull, 'wb') as null_f:
  935. null_f.write(stuff)
  936. time.sleep(random.random() / 1995)
  937. thread_has_run.add(threading.current_thread())
  938. def main():
  939. count = 0
  940. for _ in range(40):
  941. new_thread = threading.Thread(target=random_io)
  942. new_thread.daemon = True
  943. new_thread.start()
  944. count += 1
  945. while len(thread_has_run) < count:
  946. time.sleep(0.001)
  947. # Trigger process shutdown
  948. sys.exit(0)
  949. main()
  950. """
  951. rc, out, err = assert_python_ok('-c', script)
  952. self.assertFalse(err)
  953. @support.requires_fork()
  954. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  955. def test_reinit_tls_after_fork(self):
  956. # Issue #13817: fork() would deadlock in a multithreaded program with
  957. # the ad-hoc TLS implementation.
  958. def do_fork_and_wait():
  959. # just fork a child process and wait it
  960. pid = os.fork()
  961. if pid > 0:
  962. support.wait_process(pid, exitcode=50)
  963. else:
  964. os._exit(50)
  965. # start a bunch of threads that will fork() child processes
  966. threads = []
  967. for i in range(16):
  968. t = threading.Thread(target=do_fork_and_wait)
  969. threads.append(t)
  970. t.start()
  971. for t in threads:
  972. t.join()
  973. @support.requires_fork()
  974. def test_clear_threads_states_after_fork(self):
  975. # Issue #17094: check that threads states are cleared after fork()
  976. # start a bunch of threads
  977. threads = []
  978. for i in range(16):
  979. t = threading.Thread(target=lambda : time.sleep(0.3))
  980. threads.append(t)
  981. t.start()
  982. pid = os.fork()
  983. if pid == 0:
  984. # check that threads states have been cleared
  985. if len(sys._current_frames()) == 1:
  986. os._exit(51)
  987. else:
  988. os._exit(52)
  989. else:
  990. support.wait_process(pid, exitcode=51)
  991. for t in threads:
  992. t.join()
  993. class SubinterpThreadingTests(BaseTestCase):
  994. def pipe(self):
  995. r, w = os.pipe()
  996. self.addCleanup(os.close, r)
  997. self.addCleanup(os.close, w)
  998. if hasattr(os, 'set_blocking'):
  999. os.set_blocking(r, False)
  1000. return (r, w)
  1001. def test_threads_join(self):
  1002. # Non-daemon threads should be joined at subinterpreter shutdown
  1003. # (issue #18808)
  1004. r, w = self.pipe()
  1005. code = textwrap.dedent(r"""
  1006. import os
  1007. import random
  1008. import threading
  1009. import time
  1010. def random_sleep():
  1011. seconds = random.random() * 0.010
  1012. time.sleep(seconds)
  1013. def f():
  1014. # Sleep a bit so that the thread is still running when
  1015. # Py_EndInterpreter is called.
  1016. random_sleep()
  1017. os.write(%d, b"x")
  1018. threading.Thread(target=f).start()
  1019. random_sleep()
  1020. """ % (w,))
  1021. ret = test.support.run_in_subinterp(code)
  1022. self.assertEqual(ret, 0)
  1023. # The thread was joined properly.
  1024. self.assertEqual(os.read(r, 1), b"x")
  1025. def test_threads_join_2(self):
  1026. # Same as above, but a delay gets introduced after the thread's
  1027. # Python code returned but before the thread state is deleted.
  1028. # To achieve this, we register a thread-local object which sleeps
  1029. # a bit when deallocated.
  1030. r, w = self.pipe()
  1031. code = textwrap.dedent(r"""
  1032. import os
  1033. import random
  1034. import threading
  1035. import time
  1036. def random_sleep():
  1037. seconds = random.random() * 0.010
  1038. time.sleep(seconds)
  1039. class Sleeper:
  1040. def __del__(self):
  1041. random_sleep()
  1042. tls = threading.local()
  1043. def f():
  1044. # Sleep a bit so that the thread is still running when
  1045. # Py_EndInterpreter is called.
  1046. random_sleep()
  1047. tls.x = Sleeper()
  1048. os.write(%d, b"x")
  1049. threading.Thread(target=f).start()
  1050. random_sleep()
  1051. """ % (w,))
  1052. ret = test.support.run_in_subinterp(code)
  1053. self.assertEqual(ret, 0)
  1054. # The thread was joined properly.
  1055. self.assertEqual(os.read(r, 1), b"x")
  1056. @cpython_only
  1057. def test_daemon_threads_fatal_error(self):
  1058. subinterp_code = f"""if 1:
  1059. import os
  1060. import threading
  1061. import time
  1062. def f():
  1063. # Make sure the daemon thread is still running when
  1064. # Py_EndInterpreter is called.
  1065. time.sleep({test.support.SHORT_TIMEOUT})
  1066. threading.Thread(target=f, daemon=True).start()
  1067. """
  1068. script = r"""if 1:
  1069. import _testcapi
  1070. _testcapi.run_in_subinterp(%r)
  1071. """ % (subinterp_code,)
  1072. with test.support.SuppressCrashReport():
  1073. rc, out, err = assert_python_failure("-c", script)
  1074. self.assertIn("Fatal Python error: Py_EndInterpreter: "
  1075. "not the last thread", err.decode())
  1076. class ThreadingExceptionTests(BaseTestCase):
  1077. # A RuntimeError should be raised if Thread.start() is called
  1078. # multiple times.
  1079. def test_start_thread_again(self):
  1080. thread = threading.Thread()
  1081. thread.start()
  1082. self.assertRaises(RuntimeError, thread.start)
  1083. thread.join()
  1084. def test_joining_current_thread(self):
  1085. current_thread = threading.current_thread()
  1086. self.assertRaises(RuntimeError, current_thread.join);
  1087. def test_joining_inactive_thread(self):
  1088. thread = threading.Thread()
  1089. self.assertRaises(RuntimeError, thread.join)
  1090. def test_daemonize_active_thread(self):
  1091. thread = threading.Thread()
  1092. thread.start()
  1093. self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
  1094. thread.join()
  1095. def test_releasing_unacquired_lock(self):
  1096. lock = threading.Lock()
  1097. self.assertRaises(RuntimeError, lock.release)
  1098. @requires_subprocess()
  1099. def test_recursion_limit(self):
  1100. # Issue 9670
  1101. # test that excessive recursion within a non-main thread causes
  1102. # an exception rather than crashing the interpreter on platforms
  1103. # like Mac OS X or FreeBSD which have small default stack sizes
  1104. # for threads
  1105. script = """if True:
  1106. import threading
  1107. def recurse():
  1108. return recurse()
  1109. def outer():
  1110. try:
  1111. recurse()
  1112. except RecursionError:
  1113. pass
  1114. w = threading.Thread(target=outer)
  1115. w.start()
  1116. w.join()
  1117. print('end of main thread')
  1118. """
  1119. expected_output = "end of main thread\n"
  1120. p = subprocess.Popen([sys.executable, "-c", script],
  1121. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  1122. stdout, stderr = p.communicate()
  1123. data = stdout.decode().replace('\r', '')
  1124. self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
  1125. self.assertEqual(data, expected_output)
  1126. def test_print_exception(self):
  1127. script = r"""if True:
  1128. import threading
  1129. import time
  1130. running = False
  1131. def run():
  1132. global running
  1133. running = True
  1134. while running:
  1135. time.sleep(0.01)
  1136. 1/0
  1137. t = threading.Thread(target=run)
  1138. t.start()
  1139. while not running:
  1140. time.sleep(0.01)
  1141. running = False
  1142. t.join()
  1143. """
  1144. rc, out, err = assert_python_ok("-c", script)
  1145. self.assertEqual(out, b'')
  1146. err = err.decode()
  1147. self.assertIn("Exception in thread", err)
  1148. self.assertIn("Traceback (most recent call last):", err)
  1149. self.assertIn("ZeroDivisionError", err)
  1150. self.assertNotIn("Unhandled exception", err)
  1151. def test_print_exception_stderr_is_none_1(self):
  1152. script = r"""if True:
  1153. import sys
  1154. import threading
  1155. import time
  1156. running = False
  1157. def run():
  1158. global running
  1159. running = True
  1160. while running:
  1161. time.sleep(0.01)
  1162. 1/0
  1163. t = threading.Thread(target=run)
  1164. t.start()
  1165. while not running:
  1166. time.sleep(0.01)
  1167. sys.stderr = None
  1168. running = False
  1169. t.join()
  1170. """
  1171. rc, out, err = assert_python_ok("-c", script)
  1172. self.assertEqual(out, b'')
  1173. err = err.decode()
  1174. self.assertIn("Exception in thread", err)
  1175. self.assertIn("Traceback (most recent call last):", err)
  1176. self.assertIn("ZeroDivisionError", err)
  1177. self.assertNotIn("Unhandled exception", err)
  1178. def test_print_exception_stderr_is_none_2(self):
  1179. script = r"""if True:
  1180. import sys
  1181. import threading
  1182. import time
  1183. running = False
  1184. def run():
  1185. global running
  1186. running = True
  1187. while running:
  1188. time.sleep(0.01)
  1189. 1/0
  1190. sys.stderr = None
  1191. t = threading.Thread(target=run)
  1192. t.start()
  1193. while not running:
  1194. time.sleep(0.01)
  1195. running = False
  1196. t.join()
  1197. """
  1198. rc, out, err = assert_python_ok("-c", script)
  1199. self.assertEqual(out, b'')
  1200. self.assertNotIn("Unhandled exception", err.decode())
  1201. def test_bare_raise_in_brand_new_thread(self):
  1202. def bare_raise():
  1203. raise
  1204. class Issue27558(threading.Thread):
  1205. exc = None
  1206. def run(self):
  1207. try:
  1208. bare_raise()
  1209. except Exception as exc:
  1210. self.exc = exc
  1211. thread = Issue27558()
  1212. thread.start()
  1213. thread.join()
  1214. self.assertIsNotNone(thread.exc)
  1215. self.assertIsInstance(thread.exc, RuntimeError)
  1216. # explicitly break the reference cycle to not leak a dangling thread
  1217. thread.exc = None
  1218. def test_multithread_modify_file_noerror(self):
  1219. # See issue25872
  1220. def modify_file():
  1221. with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
  1222. fp.write(' ')
  1223. traceback.format_stack()
  1224. self.addCleanup(os_helper.unlink, os_helper.TESTFN)
  1225. threads = [
  1226. threading.Thread(target=modify_file)
  1227. for i in range(100)
  1228. ]
  1229. for t in threads:
  1230. t.start()
  1231. t.join()
  1232. class ThreadRunFail(threading.Thread):
  1233. def run(self):
  1234. raise ValueError("run failed")
  1235. class ExceptHookTests(BaseTestCase):
  1236. def setUp(self):
  1237. restore_default_excepthook(self)
  1238. super().setUp()
  1239. def test_excepthook(self):
  1240. with support.captured_output("stderr") as stderr:
  1241. thread = ThreadRunFail(name="excepthook thread")
  1242. thread.start()
  1243. thread.join()
  1244. stderr = stderr.getvalue().strip()
  1245. self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
  1246. self.assertIn('Traceback (most recent call last):\n', stderr)
  1247. self.assertIn(' raise ValueError("run failed")', stderr)
  1248. self.assertIn('ValueError: run failed', stderr)
  1249. @support.cpython_only
  1250. def test_excepthook_thread_None(self):
  1251. # threading.excepthook called with thread=None: log the thread
  1252. # identifier in this case.
  1253. with support.captured_output("stderr") as stderr:
  1254. try:
  1255. raise ValueError("bug")
  1256. except Exception as exc:
  1257. args = threading.ExceptHookArgs([*sys.exc_info(), None])
  1258. try:
  1259. threading.excepthook(args)
  1260. finally:
  1261. # Explicitly break a reference cycle
  1262. args = None
  1263. stderr = stderr.getvalue().strip()
  1264. self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
  1265. self.assertIn('Traceback (most recent call last):\n', stderr)
  1266. self.assertIn(' raise ValueError("bug")', stderr)
  1267. self.assertIn('ValueError: bug', stderr)
  1268. def test_system_exit(self):
  1269. class ThreadExit(threading.Thread):
  1270. def run(self):
  1271. sys.exit(1)
  1272. # threading.excepthook() silently ignores SystemExit
  1273. with support.captured_output("stderr") as stderr:
  1274. thread = ThreadExit()
  1275. thread.start()
  1276. thread.join()
  1277. self.assertEqual(stderr.getvalue(), '')
  1278. def test_custom_excepthook(self):
  1279. args = None
  1280. def hook(hook_args):
  1281. nonlocal args
  1282. args = hook_args
  1283. try:
  1284. with support.swap_attr(threading, 'excepthook', hook):
  1285. thread = ThreadRunFail()
  1286. thread.start()
  1287. thread.join()
  1288. self.assertEqual(args.exc_type, ValueError)
  1289. self.assertEqual(str(args.exc_value), 'run failed')
  1290. self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
  1291. self.assertIs(args.thread, thread)
  1292. finally:
  1293. # Break reference cycle
  1294. args = None
  1295. def test_custom_excepthook_fail(self):
  1296. def threading_hook(args):
  1297. raise ValueError("threading_hook failed")
  1298. err_str = None
  1299. def sys_hook(exc_type, exc_value, exc_traceback):
  1300. nonlocal err_str
  1301. err_str = str(exc_value)
  1302. with support.swap_attr(threading, 'excepthook', threading_hook), \
  1303. support.swap_attr(sys, 'excepthook', sys_hook), \
  1304. support.captured_output('stderr') as stderr:
  1305. thread = ThreadRunFail()
  1306. thread.start()
  1307. thread.join()
  1308. self.assertEqual(stderr.getvalue(),
  1309. 'Exception in threading.excepthook:\n')
  1310. self.assertEqual(err_str, 'threading_hook failed')
  1311. def test_original_excepthook(self):
  1312. def run_thread():
  1313. with support.captured_output("stderr") as output:
  1314. thread = ThreadRunFail(name="excepthook thread")
  1315. thread.start()
  1316. thread.join()
  1317. return output.getvalue()
  1318. def threading_hook(args):
  1319. print("Running a thread failed", file=sys.stderr)
  1320. default_output = run_thread()
  1321. with support.swap_attr(threading, 'excepthook', threading_hook):
  1322. custom_hook_output = run_thread()
  1323. threading.excepthook = threading.__excepthook__
  1324. recovered_output = run_thread()
  1325. self.assertEqual(default_output, recovered_output)
  1326. self.assertNotEqual(default_output, custom_hook_output)
  1327. self.assertEqual(custom_hook_output, "Running a thread failed\n")
  1328. class TimerTests(BaseTestCase):
  1329. def setUp(self):
  1330. BaseTestCase.setUp(self)
  1331. self.callback_args = []
  1332. self.callback_event = threading.Event()
  1333. def test_init_immutable_default_args(self):
  1334. # Issue 17435: constructor defaults were mutable objects, they could be
  1335. # mutated via the object attributes and affect other Timer objects.
  1336. timer1 = threading.Timer(0.01, self._callback_spy)
  1337. timer1.start()
  1338. self.callback_event.wait()
  1339. timer1.args.append("blah")
  1340. timer1.kwargs["foo"] = "bar"
  1341. self.callback_event.clear()
  1342. timer2 = threading.Timer(0.01, self._callback_spy)
  1343. timer2.start()
  1344. self.callback_event.wait()
  1345. self.assertEqual(len(self.callback_args), 2)
  1346. self.assertEqual(self.callback_args, [((), {}), ((), {})])
  1347. timer1.join()
  1348. timer2.join()
  1349. def _callback_spy(self, *args, **kwargs):
  1350. self.callback_args.append((args[:], kwargs.copy()))
  1351. self.callback_event.set()
  1352. class LockTests(lock_tests.LockTests):
  1353. locktype = staticmethod(threading.Lock)
  1354. class PyRLockTests(lock_tests.RLockTests):
  1355. locktype = staticmethod(threading._PyRLock)
  1356. @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
  1357. class CRLockTests(lock_tests.RLockTests):
  1358. locktype = staticmethod(threading._CRLock)
  1359. class EventTests(lock_tests.EventTests):
  1360. eventtype = staticmethod(threading.Event)
  1361. class ConditionAsRLockTests(lock_tests.RLockTests):
  1362. # Condition uses an RLock by default and exports its API.
  1363. locktype = staticmethod(threading.Condition)
  1364. class ConditionTests(lock_tests.ConditionTests):
  1365. condtype = staticmethod(threading.Condition)
  1366. class SemaphoreTests(lock_tests.SemaphoreTests):
  1367. semtype = staticmethod(threading.Semaphore)
  1368. class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
  1369. semtype = staticmethod(threading.BoundedSemaphore)
  1370. class BarrierTests(lock_tests.BarrierTests):
  1371. barriertype = staticmethod(threading.Barrier)
  1372. class MiscTestCase(unittest.TestCase):
  1373. def test__all__(self):
  1374. restore_default_excepthook(self)
  1375. extra = {"ThreadError"}
  1376. not_exported = {'currentThread', 'activeCount'}
  1377. support.check__all__(self, threading, ('threading', '_thread'),
  1378. extra=extra, not_exported=not_exported)
  1379. class InterruptMainTests(unittest.TestCase):
  1380. def check_interrupt_main_with_signal_handler(self, signum):
  1381. def handler(signum, frame):
  1382. 1/0
  1383. old_handler = signal.signal(signum, handler)
  1384. self.addCleanup(signal.signal, signum, old_handler)
  1385. with self.assertRaises(ZeroDivisionError):
  1386. _thread.interrupt_main()
  1387. def check_interrupt_main_noerror(self, signum):
  1388. handler = signal.getsignal(signum)
  1389. try:
  1390. # No exception should arise.
  1391. signal.signal(signum, signal.SIG_IGN)
  1392. _thread.interrupt_main(signum)
  1393. signal.signal(signum, signal.SIG_DFL)
  1394. _thread.interrupt_main(signum)
  1395. finally:
  1396. # Restore original handler
  1397. signal.signal(signum, handler)
  1398. def test_interrupt_main_subthread(self):
  1399. # Calling start_new_thread with a function that executes interrupt_main
  1400. # should raise KeyboardInterrupt upon completion.
  1401. def call_interrupt():
  1402. _thread.interrupt_main()
  1403. t = threading.Thread(target=call_interrupt)
  1404. with self.assertRaises(KeyboardInterrupt):
  1405. t.start()
  1406. t.join()
  1407. t.join()
  1408. def test_interrupt_main_mainthread(self):
  1409. # Make sure that if interrupt_main is called in main thread that
  1410. # KeyboardInterrupt is raised instantly.
  1411. with self.assertRaises(KeyboardInterrupt):
  1412. _thread.interrupt_main()
  1413. def test_interrupt_main_with_signal_handler(self):
  1414. self.check_interrupt_main_with_signal_handler(signal.SIGINT)
  1415. self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
  1416. def test_interrupt_main_noerror(self):
  1417. self.check_interrupt_main_noerror(signal.SIGINT)
  1418. self.check_interrupt_main_noerror(signal.SIGTERM)
  1419. def test_interrupt_main_invalid_signal(self):
  1420. self.assertRaises(ValueError, _thread.interrupt_main, -1)
  1421. self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
  1422. self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
  1423. @threading_helper.reap_threads
  1424. def test_can_interrupt_tight_loops(self):
  1425. cont = [True]
  1426. started = [False]
  1427. interrupted = [False]
  1428. def worker(started, cont, interrupted):
  1429. iterations = 100_000_000
  1430. started[0] = True
  1431. while cont[0]:
  1432. if iterations:
  1433. iterations -= 1
  1434. else:
  1435. return
  1436. pass
  1437. interrupted[0] = True
  1438. t = threading.Thread(target=worker,args=(started, cont, interrupted))
  1439. t.start()
  1440. while not started[0]:
  1441. pass
  1442. cont[0] = False
  1443. t.join()
  1444. self.assertTrue(interrupted[0])
  1445. class AtexitTests(unittest.TestCase):
  1446. def test_atexit_output(self):
  1447. rc, out, err = assert_python_ok("-c", """if True:
  1448. import threading
  1449. def run_last():
  1450. print('parrot')
  1451. threading._register_atexit(run_last)
  1452. """)
  1453. self.assertFalse(err)
  1454. self.assertEqual(out.strip(), b'parrot')
  1455. def test_atexit_called_once(self):
  1456. rc, out, err = assert_python_ok("-c", """if True:
  1457. import threading
  1458. from unittest.mock import Mock
  1459. mock = Mock()
  1460. threading._register_atexit(mock)
  1461. mock.assert_not_called()
  1462. # force early shutdown to ensure it was called once
  1463. threading._shutdown()
  1464. mock.assert_called_once()
  1465. """)
  1466. self.assertFalse(err)
  1467. def test_atexit_after_shutdown(self):
  1468. # The only way to do this is by registering an atexit within
  1469. # an atexit, which is intended to raise an exception.
  1470. rc, out, err = assert_python_ok("-c", """if True:
  1471. import threading
  1472. def func():
  1473. pass
  1474. def run_last():
  1475. threading._register_atexit(func)
  1476. threading._register_atexit(run_last)
  1477. """)
  1478. self.assertTrue(err)
  1479. self.assertIn("RuntimeError: can't register atexit after shutdown",
  1480. err.decode())
  1481. if __name__ == "__main__":
  1482. unittest.main()