| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760 |
- """
- Tests for the threading module.
- """
- import test.support
- from test.support import threading_helper, requires_subprocess
- from test.support import verbose, cpython_only, os_helper
- from test.support.import_helper import import_module
- from test.support.script_helper import assert_python_ok, assert_python_failure
- import random
- import sys
- import _thread
- import threading
- import time
- import unittest
- import weakref
- import os
- import subprocess
- import signal
- import textwrap
- import traceback
- from unittest import mock
- from test import lock_tests
- from test import support
- threading_helper.requires_working_threading(module=True)
- # Between fork() and exec(), only async-safe functions are allowed (issues
- # #12316 and #11870), and fork() from a worker thread is known to trigger
- # problems with some operating systems (issue #3863): skip problematic tests
- # on platforms known to behave badly.
- platforms_to_skip = ('netbsd5', 'hp-ux11')
- # Is Python built with Py_DEBUG macro defined?
- Py_DEBUG = hasattr(sys, 'gettotalrefcount')
- def restore_default_excepthook(testcase):
- testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
- threading.excepthook = threading.__excepthook__
- # A trivial mutable counter.
- class Counter(object):
- def __init__(self):
- self.value = 0
- def inc(self):
- self.value += 1
- def dec(self):
- self.value -= 1
- def get(self):
- return self.value
- class TestThread(threading.Thread):
- def __init__(self, name, testcase, sema, mutex, nrunning):
- threading.Thread.__init__(self, name=name)
- self.testcase = testcase
- self.sema = sema
- self.mutex = mutex
- self.nrunning = nrunning
- def run(self):
- delay = random.random() / 10000.0
- if verbose:
- print('task %s will run for %.1f usec' %
- (self.name, delay * 1e6))
- with self.sema:
- with self.mutex:
- self.nrunning.inc()
- if verbose:
- print(self.nrunning.get(), 'tasks are running')
- self.testcase.assertLessEqual(self.nrunning.get(), 3)
- time.sleep(delay)
- if verbose:
- print('task', self.name, 'done')
- with self.mutex:
- self.nrunning.dec()
- self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
- if verbose:
- print('%s is finished. %d tasks are running' %
- (self.name, self.nrunning.get()))
- class BaseTestCase(unittest.TestCase):
- def setUp(self):
- self._threads = threading_helper.threading_setup()
- def tearDown(self):
- threading_helper.threading_cleanup(*self._threads)
- test.support.reap_children()
- class ThreadTests(BaseTestCase):
- @cpython_only
- def test_name(self):
- def func(): pass
- thread = threading.Thread(name="myname1")
- self.assertEqual(thread.name, "myname1")
- # Convert int name to str
- thread = threading.Thread(name=123)
- self.assertEqual(thread.name, "123")
- # target name is ignored if name is specified
- thread = threading.Thread(target=func, name="myname2")
- self.assertEqual(thread.name, "myname2")
- with mock.patch.object(threading, '_counter', return_value=2):
- thread = threading.Thread(name="")
- self.assertEqual(thread.name, "Thread-2")
- with mock.patch.object(threading, '_counter', return_value=3):
- thread = threading.Thread()
- self.assertEqual(thread.name, "Thread-3")
- with mock.patch.object(threading, '_counter', return_value=5):
- thread = threading.Thread(target=func)
- self.assertEqual(thread.name, "Thread-5 (func)")
- def test_args_argument(self):
- # bpo-45735: Using list or tuple as *args* in constructor could
- # achieve the same effect.
- num_list = [1]
- num_tuple = (1,)
- str_list = ["str"]
- str_tuple = ("str",)
- list_in_tuple = ([1],)
- tuple_in_list = [(1,)]
- test_cases = (
- (num_list, lambda arg: self.assertEqual(arg, 1)),
- (num_tuple, lambda arg: self.assertEqual(arg, 1)),
- (str_list, lambda arg: self.assertEqual(arg, "str")),
- (str_tuple, lambda arg: self.assertEqual(arg, "str")),
- (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
- (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
- )
- for args, target in test_cases:
- with self.subTest(target=target, args=args):
- t = threading.Thread(target=target, args=args)
- t.start()
- t.join()
- @cpython_only
- def test_disallow_instantiation(self):
- # Ensure that the type disallows instantiation (bpo-43916)
- lock = threading.Lock()
- test.support.check_disallow_instantiation(self, type(lock))
- # Create a bunch of threads, let each do some work, wait until all are
- # done.
- def test_various_ops(self):
- # This takes about n/3 seconds to run (about n/3 clumps of tasks,
- # times about 1 second per clump).
- NUMTASKS = 10
- # no more than 3 of the 10 can run at once
- sema = threading.BoundedSemaphore(value=3)
- mutex = threading.RLock()
- numrunning = Counter()
- threads = []
- for i in range(NUMTASKS):
- t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
- threads.append(t)
- self.assertIsNone(t.ident)
- self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
- t.start()
- if hasattr(threading, 'get_native_id'):
- native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
- self.assertNotIn(None, native_ids)
- self.assertEqual(len(native_ids), NUMTASKS + 1)
- if verbose:
- print('waiting for all tasks to complete')
- for t in threads:
- t.join()
- self.assertFalse(t.is_alive())
- self.assertNotEqual(t.ident, 0)
- self.assertIsNotNone(t.ident)
- self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
- if verbose:
- print('all tasks done')
- self.assertEqual(numrunning.get(), 0)
- def test_ident_of_no_threading_threads(self):
- # The ident still must work for the main thread and dummy threads.
- self.assertIsNotNone(threading.current_thread().ident)
- def f():
- ident.append(threading.current_thread().ident)
- done.set()
- done = threading.Event()
- ident = []
- with threading_helper.wait_threads_exit():
- tid = _thread.start_new_thread(f, ())
- done.wait()
- self.assertEqual(ident[0], tid)
- # Kill the "immortal" _DummyThread
- del threading._active[ident[0]]
- # run with a small(ish) thread stack size (256 KiB)
- def test_various_ops_small_stack(self):
- if verbose:
- print('with 256 KiB thread stack size...')
- try:
- threading.stack_size(262144)
- except _thread.error:
- raise unittest.SkipTest(
- 'platform does not support changing thread stack size')
- self.test_various_ops()
- threading.stack_size(0)
- # run with a large thread stack size (1 MiB)
- def test_various_ops_large_stack(self):
- if verbose:
- print('with 1 MiB thread stack size...')
- try:
- threading.stack_size(0x100000)
- except _thread.error:
- raise unittest.SkipTest(
- 'platform does not support changing thread stack size')
- self.test_various_ops()
- threading.stack_size(0)
- def test_foreign_thread(self):
- # Check that a "foreign" thread can use the threading module.
- def f(mutex):
- # Calling current_thread() forces an entry for the foreign
- # thread to get made in the threading._active map.
- threading.current_thread()
- mutex.release()
- mutex = threading.Lock()
- mutex.acquire()
- with threading_helper.wait_threads_exit():
- tid = _thread.start_new_thread(f, (mutex,))
- # Wait for the thread to finish.
- mutex.acquire()
- self.assertIn(tid, threading._active)
- self.assertIsInstance(threading._active[tid], threading._DummyThread)
- #Issue 29376
- self.assertTrue(threading._active[tid].is_alive())
- self.assertRegex(repr(threading._active[tid]), '_DummyThread')
- del threading._active[tid]
- # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
- # exposed at the Python level. This test relies on ctypes to get at it.
- def test_PyThreadState_SetAsyncExc(self):
- ctypes = import_module("ctypes")
- set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
- set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
- class AsyncExc(Exception):
- pass
- exception = ctypes.py_object(AsyncExc)
- # First check it works when setting the exception from the same thread.
- tid = threading.get_ident()
- self.assertIsInstance(tid, int)
- self.assertGreater(tid, 0)
- try:
- result = set_async_exc(tid, exception)
- # The exception is async, so we might have to keep the VM busy until
- # it notices.
- while True:
- pass
- except AsyncExc:
- pass
- else:
- # This code is unreachable but it reflects the intent. If we wanted
- # to be smarter the above loop wouldn't be infinite.
- self.fail("AsyncExc not raised")
- try:
- self.assertEqual(result, 1) # one thread state modified
- except UnboundLocalError:
- # The exception was raised too quickly for us to get the result.
- pass
- # `worker_started` is set by the thread when it's inside a try/except
- # block waiting to catch the asynchronously set AsyncExc exception.
- # `worker_saw_exception` is set by the thread upon catching that
- # exception.
- worker_started = threading.Event()
- worker_saw_exception = threading.Event()
- class Worker(threading.Thread):
- def run(self):
- self.id = threading.get_ident()
- self.finished = False
- try:
- while True:
- worker_started.set()
- time.sleep(0.1)
- except AsyncExc:
- self.finished = True
- worker_saw_exception.set()
- t = Worker()
- t.daemon = True # so if this fails, we don't hang Python at shutdown
- t.start()
- if verbose:
- print(" started worker thread")
- # Try a thread id that doesn't make sense.
- if verbose:
- print(" trying nonsensical thread id")
- result = set_async_exc(-1, exception)
- self.assertEqual(result, 0) # no thread states modified
- # Now raise an exception in the worker thread.
- if verbose:
- print(" waiting for worker thread to get started")
- ret = worker_started.wait()
- self.assertTrue(ret)
- if verbose:
- print(" verifying worker hasn't exited")
- self.assertFalse(t.finished)
- if verbose:
- print(" attempting to raise asynch exception in worker")
- result = set_async_exc(t.id, exception)
- self.assertEqual(result, 1) # one thread state modified
- if verbose:
- print(" waiting for worker to say it caught the exception")
- worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
- self.assertTrue(t.finished)
- if verbose:
- print(" all OK -- joining worker")
- if t.finished:
- t.join()
- # else the thread is still running, and we have no way to kill it
- def test_limbo_cleanup(self):
- # Issue 7481: Failure to start thread should cleanup the limbo map.
- def fail_new_thread(*args):
- raise threading.ThreadError()
- _start_new_thread = threading._start_new_thread
- threading._start_new_thread = fail_new_thread
- try:
- t = threading.Thread(target=lambda: None)
- self.assertRaises(threading.ThreadError, t.start)
- self.assertFalse(
- t in threading._limbo,
- "Failed to cleanup _limbo map on failure of Thread.start().")
- finally:
- threading._start_new_thread = _start_new_thread
- def test_finalize_running_thread(self):
- # Issue 1402: the PyGILState_Ensure / _Release functions may be called
- # very late on python exit: on deallocation of a running thread for
- # example.
- import_module("ctypes")
- rc, out, err = assert_python_failure("-c", """if 1:
- import ctypes, sys, time, _thread
- # This lock is used as a simple event variable.
- ready = _thread.allocate_lock()
- ready.acquire()
- # Module globals are cleared before __del__ is run
- # So we save the functions in class dict
- class C:
- ensure = ctypes.pythonapi.PyGILState_Ensure
- release = ctypes.pythonapi.PyGILState_Release
- def __del__(self):
- state = self.ensure()
- self.release(state)
- def waitingThread():
- x = C()
- ready.release()
- time.sleep(100)
- _thread.start_new_thread(waitingThread, ())
- ready.acquire() # Be sure the other thread is waiting.
- sys.exit(42)
- """)
- self.assertEqual(rc, 42)
- def test_finalize_with_trace(self):
- # Issue1733757
- # Avoid a deadlock when sys.settrace steps into threading._shutdown
- assert_python_ok("-c", """if 1:
- import sys, threading
- # A deadlock-killer, to prevent the
- # testsuite to hang forever
- def killer():
- import os, time
- time.sleep(2)
- print('program blocked; aborting')
- os._exit(2)
- t = threading.Thread(target=killer)
- t.daemon = True
- t.start()
- # This is the trace function
- def func(frame, event, arg):
- threading.current_thread()
- return func
- sys.settrace(func)
- """)
- def test_join_nondaemon_on_shutdown(self):
- # Issue 1722344
- # Raising SystemExit skipped threading._shutdown
- rc, out, err = assert_python_ok("-c", """if 1:
- import threading
- from time import sleep
- def child():
- sleep(1)
- # As a non-daemon thread we SHOULD wake up and nothing
- # should be torn down yet
- print("Woke up, sleep function is:", sleep)
- threading.Thread(target=child).start()
- raise SystemExit
- """)
- self.assertEqual(out.strip(),
- b"Woke up, sleep function is: <built-in function sleep>")
- self.assertEqual(err, b"")
- def test_enumerate_after_join(self):
- # Try hard to trigger #1703448: a thread is still returned in
- # threading.enumerate() after it has been join()ed.
- enum = threading.enumerate
- old_interval = sys.getswitchinterval()
- try:
- for i in range(1, 100):
- sys.setswitchinterval(i * 0.0002)
- t = threading.Thread(target=lambda: None)
- t.start()
- t.join()
- l = enum()
- self.assertNotIn(t, l,
- "#1703448 triggered after %d trials: %s" % (i, l))
- finally:
- sys.setswitchinterval(old_interval)
- def test_no_refcycle_through_target(self):
- class RunSelfFunction(object):
- def __init__(self, should_raise):
- # The links in this refcycle from Thread back to self
- # should be cleaned up when the thread completes.
- self.should_raise = should_raise
- self.thread = threading.Thread(target=self._run,
- args=(self,),
- kwargs={'yet_another':self})
- self.thread.start()
- def _run(self, other_ref, yet_another):
- if self.should_raise:
- raise SystemExit
- restore_default_excepthook(self)
- cyclic_object = RunSelfFunction(should_raise=False)
- weak_cyclic_object = weakref.ref(cyclic_object)
- cyclic_object.thread.join()
- del cyclic_object
- self.assertIsNone(weak_cyclic_object(),
- msg=('%d references still around' %
- sys.getrefcount(weak_cyclic_object())))
- raising_cyclic_object = RunSelfFunction(should_raise=True)
- weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
- raising_cyclic_object.thread.join()
- del raising_cyclic_object
- self.assertIsNone(weak_raising_cyclic_object(),
- msg=('%d references still around' %
- sys.getrefcount(weak_raising_cyclic_object())))
- def test_old_threading_api(self):
- # Just a quick sanity check to make sure the old method names are
- # still present
- t = threading.Thread()
- with self.assertWarnsRegex(DeprecationWarning,
- r'get the daemon attribute'):
- t.isDaemon()
- with self.assertWarnsRegex(DeprecationWarning,
- r'set the daemon attribute'):
- t.setDaemon(True)
- with self.assertWarnsRegex(DeprecationWarning,
- r'get the name attribute'):
- t.getName()
- with self.assertWarnsRegex(DeprecationWarning,
- r'set the name attribute'):
- t.setName("name")
- e = threading.Event()
- with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
- e.isSet()
- cond = threading.Condition()
- cond.acquire()
- with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
- cond.notifyAll()
- with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
- threading.activeCount()
- with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
- threading.currentThread()
- def test_repr_daemon(self):
- t = threading.Thread()
- self.assertNotIn('daemon', repr(t))
- t.daemon = True
- self.assertIn('daemon', repr(t))
- def test_daemon_param(self):
- t = threading.Thread()
- self.assertFalse(t.daemon)
- t = threading.Thread(daemon=False)
- self.assertFalse(t.daemon)
- t = threading.Thread(daemon=True)
- self.assertTrue(t.daemon)
- @support.requires_fork()
- def test_fork_at_exit(self):
- # bpo-42350: Calling os.fork() after threading._shutdown() must
- # not log an error.
- code = textwrap.dedent("""
- import atexit
- import os
- import sys
- from test.support import wait_process
- # Import the threading module to register its "at fork" callback
- import threading
- def exit_handler():
- pid = os.fork()
- if not pid:
- print("child process ok", file=sys.stderr, flush=True)
- # child process
- else:
- wait_process(pid, exitcode=0)
- # exit_handler() will be called after threading._shutdown()
- atexit.register(exit_handler)
- """)
- _, out, err = assert_python_ok("-c", code)
- self.assertEqual(out, b'')
- self.assertEqual(err.rstrip(), b'child process ok')
- @support.requires_fork()
- def test_dummy_thread_after_fork(self):
- # Issue #14308: a dummy thread in the active list doesn't mess up
- # the after-fork mechanism.
- code = """if 1:
- import _thread, threading, os, time
- def background_thread(evt):
- # Creates and registers the _DummyThread instance
- threading.current_thread()
- evt.set()
- time.sleep(10)
- evt = threading.Event()
- _thread.start_new_thread(background_thread, (evt,))
- evt.wait()
- assert threading.active_count() == 2, threading.active_count()
- if os.fork() == 0:
- assert threading.active_count() == 1, threading.active_count()
- os._exit(0)
- else:
- os.wait()
- """
- _, out, err = assert_python_ok("-c", code)
- self.assertEqual(out, b'')
- self.assertEqual(err, b'')
- @support.requires_fork()
- def test_is_alive_after_fork(self):
- # Try hard to trigger #18418: is_alive() could sometimes be True on
- # threads that vanished after a fork.
- old_interval = sys.getswitchinterval()
- self.addCleanup(sys.setswitchinterval, old_interval)
- # Make the bug more likely to manifest.
- test.support.setswitchinterval(1e-6)
- for i in range(20):
- t = threading.Thread(target=lambda: None)
- t.start()
- pid = os.fork()
- if pid == 0:
- os._exit(11 if t.is_alive() else 10)
- else:
- t.join()
- support.wait_process(pid, exitcode=10)
- def test_main_thread(self):
- main = threading.main_thread()
- self.assertEqual(main.name, 'MainThread')
- self.assertEqual(main.ident, threading.current_thread().ident)
- self.assertEqual(main.ident, threading.get_ident())
- def f():
- self.assertNotEqual(threading.main_thread().ident,
- threading.current_thread().ident)
- th = threading.Thread(target=f)
- th.start()
- th.join()
- @support.requires_fork()
- @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
- def test_main_thread_after_fork(self):
- code = """if 1:
- import os, threading
- from test import support
- pid = os.fork()
- if pid == 0:
- main = threading.main_thread()
- print(main.name)
- print(main.ident == threading.current_thread().ident)
- print(main.ident == threading.get_ident())
- else:
- support.wait_process(pid, exitcode=0)
- """
- _, out, err = assert_python_ok("-c", code)
- data = out.decode().replace('\r', '')
- self.assertEqual(err, b"")
- self.assertEqual(data, "MainThread\nTrue\nTrue\n")
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- @support.requires_fork()
- @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
- def test_main_thread_after_fork_from_nonmain_thread(self):
- code = """if 1:
- import os, threading, sys
- from test import support
- def func():
- pid = os.fork()
- if pid == 0:
- main = threading.main_thread()
- print(main.name)
- print(main.ident == threading.current_thread().ident)
- print(main.ident == threading.get_ident())
- # stdout is fully buffered because not a tty,
- # we have to flush before exit.
- sys.stdout.flush()
- else:
- support.wait_process(pid, exitcode=0)
- th = threading.Thread(target=func)
- th.start()
- th.join()
- """
- _, out, err = assert_python_ok("-c", code)
- data = out.decode().replace('\r', '')
- self.assertEqual(err, b"")
- self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
- def test_main_thread_during_shutdown(self):
- # bpo-31516: current_thread() should still point to the main thread
- # at shutdown
- code = """if 1:
- import gc, threading
- main_thread = threading.current_thread()
- assert main_thread is threading.main_thread() # sanity check
- class RefCycle:
- def __init__(self):
- self.cycle = self
- def __del__(self):
- print("GC:",
- threading.current_thread() is main_thread,
- threading.main_thread() is main_thread,
- threading.enumerate() == [main_thread])
- RefCycle()
- gc.collect() # sanity check
- x = RefCycle()
- """
- _, out, err = assert_python_ok("-c", code)
- data = out.decode()
- self.assertEqual(err, b"")
- self.assertEqual(data.splitlines(),
- ["GC: True True True"] * 2)
- def test_finalization_shutdown(self):
- # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
- # until Python thread states of all non-daemon threads get deleted.
- #
- # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
- # test the finalization of the main interpreter.
- code = """if 1:
- import os
- import threading
- import time
- import random
- def random_sleep():
- seconds = random.random() * 0.010
- time.sleep(seconds)
- class Sleeper:
- def __del__(self):
- random_sleep()
- tls = threading.local()
- def f():
- # Sleep a bit so that the thread is still running when
- # Py_Finalize() is called.
- random_sleep()
- tls.x = Sleeper()
- random_sleep()
- threading.Thread(target=f).start()
- random_sleep()
- """
- rc, out, err = assert_python_ok("-c", code)
- self.assertEqual(err, b"")
- def test_tstate_lock(self):
- # Test an implementation detail of Thread objects.
- started = _thread.allocate_lock()
- finish = _thread.allocate_lock()
- started.acquire()
- finish.acquire()
- def f():
- started.release()
- finish.acquire()
- time.sleep(0.01)
- # The tstate lock is None until the thread is started
- t = threading.Thread(target=f)
- self.assertIs(t._tstate_lock, None)
- t.start()
- started.acquire()
- self.assertTrue(t.is_alive())
- # The tstate lock can't be acquired when the thread is running
- # (or suspended).
- tstate_lock = t._tstate_lock
- self.assertFalse(tstate_lock.acquire(timeout=0), False)
- finish.release()
- # When the thread ends, the state_lock can be successfully
- # acquired.
- self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
- # But is_alive() is still True: we hold _tstate_lock now, which
- # prevents is_alive() from knowing the thread's end-of-life C code
- # is done.
- self.assertTrue(t.is_alive())
- # Let is_alive() find out the C code is done.
- tstate_lock.release()
- self.assertFalse(t.is_alive())
- # And verify the thread disposed of _tstate_lock.
- self.assertIsNone(t._tstate_lock)
- t.join()
- def test_repr_stopped(self):
- # Verify that "stopped" shows up in repr(Thread) appropriately.
- started = _thread.allocate_lock()
- finish = _thread.allocate_lock()
- started.acquire()
- finish.acquire()
- def f():
- started.release()
- finish.acquire()
- t = threading.Thread(target=f)
- t.start()
- started.acquire()
- self.assertIn("started", repr(t))
- finish.release()
- # "stopped" should appear in the repr in a reasonable amount of time.
- # Implementation detail: as of this writing, that's trivially true
- # if .join() is called, and almost trivially true if .is_alive() is
- # called. The detail we're testing here is that "stopped" shows up
- # "all on its own".
- LOOKING_FOR = "stopped"
- for i in range(500):
- if LOOKING_FOR in repr(t):
- break
- time.sleep(0.01)
- self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
- t.join()
- def test_BoundedSemaphore_limit(self):
- # BoundedSemaphore should raise ValueError if released too often.
- for limit in range(1, 10):
- bs = threading.BoundedSemaphore(limit)
- threads = [threading.Thread(target=bs.acquire)
- for _ in range(limit)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- threads = [threading.Thread(target=bs.release)
- for _ in range(limit)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- self.assertRaises(ValueError, bs.release)
- @cpython_only
- def test_frame_tstate_tracing(self):
- # Issue #14432: Crash when a generator is created in a C thread that is
- # destroyed while the generator is still used. The issue was that a
- # generator contains a frame, and the frame kept a reference to the
- # Python state of the destroyed C thread. The crash occurs when a trace
- # function is setup.
- def noop_trace(frame, event, arg):
- # no operation
- return noop_trace
- def generator():
- while 1:
- yield "generator"
- def callback():
- if callback.gen is None:
- callback.gen = generator()
- return next(callback.gen)
- callback.gen = None
- old_trace = sys.gettrace()
- sys.settrace(noop_trace)
- try:
- # Install a trace function
- threading.settrace(noop_trace)
- # Create a generator in a C thread which exits after the call
- import _testcapi
- _testcapi.call_in_temporary_c_thread(callback)
- # Call the generator in a different Python thread, check that the
- # generator didn't keep a reference to the destroyed thread state
- for test in range(3):
- # The trace function is still called here
- callback()
- finally:
- sys.settrace(old_trace)
- def test_gettrace(self):
- def noop_trace(frame, event, arg):
- # no operation
- return noop_trace
- old_trace = threading.gettrace()
- try:
- threading.settrace(noop_trace)
- trace_func = threading.gettrace()
- self.assertEqual(noop_trace,trace_func)
- finally:
- threading.settrace(old_trace)
- def test_getprofile(self):
- def fn(*args): pass
- old_profile = threading.getprofile()
- try:
- threading.setprofile(fn)
- self.assertEqual(fn, threading.getprofile())
- finally:
- threading.setprofile(old_profile)
- @cpython_only
- def test_shutdown_locks(self):
- for daemon in (False, True):
- with self.subTest(daemon=daemon):
- event = threading.Event()
- thread = threading.Thread(target=event.wait, daemon=daemon)
- # Thread.start() must add lock to _shutdown_locks,
- # but only for non-daemon thread
- thread.start()
- tstate_lock = thread._tstate_lock
- if not daemon:
- self.assertIn(tstate_lock, threading._shutdown_locks)
- else:
- self.assertNotIn(tstate_lock, threading._shutdown_locks)
- # unblock the thread and join it
- event.set()
- thread.join()
- # Thread._stop() must remove tstate_lock from _shutdown_locks.
- # Daemon threads must never add it to _shutdown_locks.
- self.assertNotIn(tstate_lock, threading._shutdown_locks)
- def test_locals_at_exit(self):
- # bpo-19466: thread locals must not be deleted before destructors
- # are called
- rc, out, err = assert_python_ok("-c", """if 1:
- import threading
- class Atexit:
- def __del__(self):
- print("thread_dict.atexit = %r" % thread_dict.atexit)
- thread_dict = threading.local()
- thread_dict.atexit = "value"
- atexit = Atexit()
- """)
- self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
- def test_boolean_target(self):
- # bpo-41149: A thread that had a boolean value of False would not
- # run, regardless of whether it was callable. The correct behaviour
- # is for a thread to do nothing if its target is None, and to call
- # the target otherwise.
- class BooleanTarget(object):
- def __init__(self):
- self.ran = False
- def __bool__(self):
- return False
- def __call__(self):
- self.ran = True
- target = BooleanTarget()
- thread = threading.Thread(target=target)
- thread.start()
- thread.join()
- self.assertTrue(target.ran)
- def test_leak_without_join(self):
- # bpo-37788: Test that a thread which is not joined explicitly
- # does not leak. Test written for reference leak checks.
- def noop(): pass
- with threading_helper.wait_threads_exit():
- threading.Thread(target=noop).start()
- # Thread.join() is not called
- @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)')
- def test_debug_deprecation(self):
- # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated
- rc, out, err = assert_python_ok("-Wdefault", "-c", "pass",
- PYTHONTHREADDEBUG="1")
- msg = (b'DeprecationWarning: The threading debug '
- b'(PYTHONTHREADDEBUG environment variable) '
- b'is deprecated and will be removed in Python 3.12')
- self.assertIn(msg, err)
- def test_import_from_another_thread(self):
- # bpo-1596321: If the threading module is first import from a thread
- # different than the main thread, threading._shutdown() must handle
- # this case without logging an error at Python exit.
- code = textwrap.dedent('''
- import _thread
- import sys
- event = _thread.allocate_lock()
- event.acquire()
- def import_threading():
- import threading
- event.release()
- if 'threading' in sys.modules:
- raise Exception('threading is already imported')
- _thread.start_new_thread(import_threading, ())
- # wait until the threading module is imported
- event.acquire()
- event.release()
- if 'threading' not in sys.modules:
- raise Exception('threading is not imported')
- # don't wait until the thread completes
- ''')
- rc, out, err = assert_python_ok("-c", code)
- self.assertEqual(out, b'')
- self.assertEqual(err, b'')
- class ThreadJoinOnShutdown(BaseTestCase):
- def _run_and_join(self, script):
- script = """if 1:
- import sys, os, time, threading
- # a thread, which waits for the main program to terminate
- def joiningfunc(mainthread):
- mainthread.join()
- print('end of thread')
- # stdout is fully buffered because not a tty, we have to flush
- # before exit.
- sys.stdout.flush()
- \n""" + script
- rc, out, err = assert_python_ok("-c", script)
- data = out.decode().replace('\r', '')
- self.assertEqual(data, "end of main\nend of thread\n")
- def test_1_join_on_shutdown(self):
- # The usual case: on exit, wait for a non-daemon thread
- script = """if 1:
- import os
- t = threading.Thread(target=joiningfunc,
- args=(threading.current_thread(),))
- t.start()
- time.sleep(0.1)
- print('end of main')
- """
- self._run_and_join(script)
- @support.requires_fork()
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_2_join_in_forked_process(self):
- # Like the test above, but from a forked interpreter
- script = """if 1:
- from test import support
- childpid = os.fork()
- if childpid != 0:
- # parent process
- support.wait_process(childpid, exitcode=0)
- sys.exit(0)
- # child process
- t = threading.Thread(target=joiningfunc,
- args=(threading.current_thread(),))
- t.start()
- print('end of main')
- """
- self._run_and_join(script)
- @support.requires_fork()
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_3_join_in_forked_from_thread(self):
- # Like the test above, but fork() was called from a worker thread
- # In the forked process, the main Thread object must be marked as stopped.
- script = """if 1:
- from test import support
- main_thread = threading.current_thread()
- def worker():
- childpid = os.fork()
- if childpid != 0:
- # parent process
- support.wait_process(childpid, exitcode=0)
- sys.exit(0)
- # child process
- t = threading.Thread(target=joiningfunc,
- args=(main_thread,))
- print('end of main')
- t.start()
- t.join() # Should not block: main_thread is already stopped
- w = threading.Thread(target=worker)
- w.start()
- """
- self._run_and_join(script)
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_4_daemon_threads(self):
- # Check that a daemon thread cannot crash the interpreter on shutdown
- # by manipulating internal structures that are being disposed of in
- # the main thread.
- script = """if True:
- import os
- import random
- import sys
- import time
- import threading
- thread_has_run = set()
- def random_io():
- '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
- import test.test_threading as mod
- while True:
- with open(mod.__file__, 'rb') as in_f:
- stuff = in_f.read(200)
- with open(os.devnull, 'wb') as null_f:
- null_f.write(stuff)
- time.sleep(random.random() / 1995)
- thread_has_run.add(threading.current_thread())
- def main():
- count = 0
- for _ in range(40):
- new_thread = threading.Thread(target=random_io)
- new_thread.daemon = True
- new_thread.start()
- count += 1
- while len(thread_has_run) < count:
- time.sleep(0.001)
- # Trigger process shutdown
- sys.exit(0)
- main()
- """
- rc, out, err = assert_python_ok('-c', script)
- self.assertFalse(err)
- @support.requires_fork()
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_reinit_tls_after_fork(self):
- # Issue #13817: fork() would deadlock in a multithreaded program with
- # the ad-hoc TLS implementation.
- def do_fork_and_wait():
- # just fork a child process and wait it
- pid = os.fork()
- if pid > 0:
- support.wait_process(pid, exitcode=50)
- else:
- os._exit(50)
- # start a bunch of threads that will fork() child processes
- threads = []
- for i in range(16):
- t = threading.Thread(target=do_fork_and_wait)
- threads.append(t)
- t.start()
- for t in threads:
- t.join()
- @support.requires_fork()
- def test_clear_threads_states_after_fork(self):
- # Issue #17094: check that threads states are cleared after fork()
- # start a bunch of threads
- threads = []
- for i in range(16):
- t = threading.Thread(target=lambda : time.sleep(0.3))
- threads.append(t)
- t.start()
- pid = os.fork()
- if pid == 0:
- # check that threads states have been cleared
- if len(sys._current_frames()) == 1:
- os._exit(51)
- else:
- os._exit(52)
- else:
- support.wait_process(pid, exitcode=51)
- for t in threads:
- t.join()
- class SubinterpThreadingTests(BaseTestCase):
- def pipe(self):
- r, w = os.pipe()
- self.addCleanup(os.close, r)
- self.addCleanup(os.close, w)
- if hasattr(os, 'set_blocking'):
- os.set_blocking(r, False)
- return (r, w)
- def test_threads_join(self):
- # Non-daemon threads should be joined at subinterpreter shutdown
- # (issue #18808)
- r, w = self.pipe()
- code = textwrap.dedent(r"""
- import os
- import random
- import threading
- import time
- def random_sleep():
- seconds = random.random() * 0.010
- time.sleep(seconds)
- def f():
- # Sleep a bit so that the thread is still running when
- # Py_EndInterpreter is called.
- random_sleep()
- os.write(%d, b"x")
- threading.Thread(target=f).start()
- random_sleep()
- """ % (w,))
- ret = test.support.run_in_subinterp(code)
- self.assertEqual(ret, 0)
- # The thread was joined properly.
- self.assertEqual(os.read(r, 1), b"x")
- def test_threads_join_2(self):
- # Same as above, but a delay gets introduced after the thread's
- # Python code returned but before the thread state is deleted.
- # To achieve this, we register a thread-local object which sleeps
- # a bit when deallocated.
- r, w = self.pipe()
- code = textwrap.dedent(r"""
- import os
- import random
- import threading
- import time
- def random_sleep():
- seconds = random.random() * 0.010
- time.sleep(seconds)
- class Sleeper:
- def __del__(self):
- random_sleep()
- tls = threading.local()
- def f():
- # Sleep a bit so that the thread is still running when
- # Py_EndInterpreter is called.
- random_sleep()
- tls.x = Sleeper()
- os.write(%d, b"x")
- threading.Thread(target=f).start()
- random_sleep()
- """ % (w,))
- ret = test.support.run_in_subinterp(code)
- self.assertEqual(ret, 0)
- # The thread was joined properly.
- self.assertEqual(os.read(r, 1), b"x")
- @cpython_only
- def test_daemon_threads_fatal_error(self):
- subinterp_code = f"""if 1:
- import os
- import threading
- import time
- def f():
- # Make sure the daemon thread is still running when
- # Py_EndInterpreter is called.
- time.sleep({test.support.SHORT_TIMEOUT})
- threading.Thread(target=f, daemon=True).start()
- """
- script = r"""if 1:
- import _testcapi
- _testcapi.run_in_subinterp(%r)
- """ % (subinterp_code,)
- with test.support.SuppressCrashReport():
- rc, out, err = assert_python_failure("-c", script)
- self.assertIn("Fatal Python error: Py_EndInterpreter: "
- "not the last thread", err.decode())
- class ThreadingExceptionTests(BaseTestCase):
- # A RuntimeError should be raised if Thread.start() is called
- # multiple times.
- def test_start_thread_again(self):
- thread = threading.Thread()
- thread.start()
- self.assertRaises(RuntimeError, thread.start)
- thread.join()
- def test_joining_current_thread(self):
- current_thread = threading.current_thread()
- self.assertRaises(RuntimeError, current_thread.join);
- def test_joining_inactive_thread(self):
- thread = threading.Thread()
- self.assertRaises(RuntimeError, thread.join)
- def test_daemonize_active_thread(self):
- thread = threading.Thread()
- thread.start()
- self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
- thread.join()
- def test_releasing_unacquired_lock(self):
- lock = threading.Lock()
- self.assertRaises(RuntimeError, lock.release)
- @requires_subprocess()
- def test_recursion_limit(self):
- # Issue 9670
- # test that excessive recursion within a non-main thread causes
- # an exception rather than crashing the interpreter on platforms
- # like Mac OS X or FreeBSD which have small default stack sizes
- # for threads
- script = """if True:
- import threading
- def recurse():
- return recurse()
- def outer():
- try:
- recurse()
- except RecursionError:
- pass
- w = threading.Thread(target=outer)
- w.start()
- w.join()
- print('end of main thread')
- """
- expected_output = "end of main thread\n"
- p = subprocess.Popen([sys.executable, "-c", script],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- data = stdout.decode().replace('\r', '')
- self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
- self.assertEqual(data, expected_output)
- def test_print_exception(self):
- script = r"""if True:
- import threading
- import time
- running = False
- def run():
- global running
- running = True
- while running:
- time.sleep(0.01)
- 1/0
- t = threading.Thread(target=run)
- t.start()
- while not running:
- time.sleep(0.01)
- running = False
- t.join()
- """
- rc, out, err = assert_python_ok("-c", script)
- self.assertEqual(out, b'')
- err = err.decode()
- self.assertIn("Exception in thread", err)
- self.assertIn("Traceback (most recent call last):", err)
- self.assertIn("ZeroDivisionError", err)
- self.assertNotIn("Unhandled exception", err)
- def test_print_exception_stderr_is_none_1(self):
- script = r"""if True:
- import sys
- import threading
- import time
- running = False
- def run():
- global running
- running = True
- while running:
- time.sleep(0.01)
- 1/0
- t = threading.Thread(target=run)
- t.start()
- while not running:
- time.sleep(0.01)
- sys.stderr = None
- running = False
- t.join()
- """
- rc, out, err = assert_python_ok("-c", script)
- self.assertEqual(out, b'')
- err = err.decode()
- self.assertIn("Exception in thread", err)
- self.assertIn("Traceback (most recent call last):", err)
- self.assertIn("ZeroDivisionError", err)
- self.assertNotIn("Unhandled exception", err)
- def test_print_exception_stderr_is_none_2(self):
- script = r"""if True:
- import sys
- import threading
- import time
- running = False
- def run():
- global running
- running = True
- while running:
- time.sleep(0.01)
- 1/0
- sys.stderr = None
- t = threading.Thread(target=run)
- t.start()
- while not running:
- time.sleep(0.01)
- running = False
- t.join()
- """
- rc, out, err = assert_python_ok("-c", script)
- self.assertEqual(out, b'')
- self.assertNotIn("Unhandled exception", err.decode())
- def test_bare_raise_in_brand_new_thread(self):
- def bare_raise():
- raise
- class Issue27558(threading.Thread):
- exc = None
- def run(self):
- try:
- bare_raise()
- except Exception as exc:
- self.exc = exc
- thread = Issue27558()
- thread.start()
- thread.join()
- self.assertIsNotNone(thread.exc)
- self.assertIsInstance(thread.exc, RuntimeError)
- # explicitly break the reference cycle to not leak a dangling thread
- thread.exc = None
- def test_multithread_modify_file_noerror(self):
- # See issue25872
- def modify_file():
- with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
- fp.write(' ')
- traceback.format_stack()
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- threads = [
- threading.Thread(target=modify_file)
- for i in range(100)
- ]
- for t in threads:
- t.start()
- t.join()
- class ThreadRunFail(threading.Thread):
- def run(self):
- raise ValueError("run failed")
- class ExceptHookTests(BaseTestCase):
- def setUp(self):
- restore_default_excepthook(self)
- super().setUp()
- def test_excepthook(self):
- with support.captured_output("stderr") as stderr:
- thread = ThreadRunFail(name="excepthook thread")
- thread.start()
- thread.join()
- stderr = stderr.getvalue().strip()
- self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
- self.assertIn('Traceback (most recent call last):\n', stderr)
- self.assertIn(' raise ValueError("run failed")', stderr)
- self.assertIn('ValueError: run failed', stderr)
- @support.cpython_only
- def test_excepthook_thread_None(self):
- # threading.excepthook called with thread=None: log the thread
- # identifier in this case.
- with support.captured_output("stderr") as stderr:
- try:
- raise ValueError("bug")
- except Exception as exc:
- args = threading.ExceptHookArgs([*sys.exc_info(), None])
- try:
- threading.excepthook(args)
- finally:
- # Explicitly break a reference cycle
- args = None
- stderr = stderr.getvalue().strip()
- self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
- self.assertIn('Traceback (most recent call last):\n', stderr)
- self.assertIn(' raise ValueError("bug")', stderr)
- self.assertIn('ValueError: bug', stderr)
- def test_system_exit(self):
- class ThreadExit(threading.Thread):
- def run(self):
- sys.exit(1)
- # threading.excepthook() silently ignores SystemExit
- with support.captured_output("stderr") as stderr:
- thread = ThreadExit()
- thread.start()
- thread.join()
- self.assertEqual(stderr.getvalue(), '')
- def test_custom_excepthook(self):
- args = None
- def hook(hook_args):
- nonlocal args
- args = hook_args
- try:
- with support.swap_attr(threading, 'excepthook', hook):
- thread = ThreadRunFail()
- thread.start()
- thread.join()
- self.assertEqual(args.exc_type, ValueError)
- self.assertEqual(str(args.exc_value), 'run failed')
- self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
- self.assertIs(args.thread, thread)
- finally:
- # Break reference cycle
- args = None
- def test_custom_excepthook_fail(self):
- def threading_hook(args):
- raise ValueError("threading_hook failed")
- err_str = None
- def sys_hook(exc_type, exc_value, exc_traceback):
- nonlocal err_str
- err_str = str(exc_value)
- with support.swap_attr(threading, 'excepthook', threading_hook), \
- support.swap_attr(sys, 'excepthook', sys_hook), \
- support.captured_output('stderr') as stderr:
- thread = ThreadRunFail()
- thread.start()
- thread.join()
- self.assertEqual(stderr.getvalue(),
- 'Exception in threading.excepthook:\n')
- self.assertEqual(err_str, 'threading_hook failed')
- def test_original_excepthook(self):
- def run_thread():
- with support.captured_output("stderr") as output:
- thread = ThreadRunFail(name="excepthook thread")
- thread.start()
- thread.join()
- return output.getvalue()
- def threading_hook(args):
- print("Running a thread failed", file=sys.stderr)
- default_output = run_thread()
- with support.swap_attr(threading, 'excepthook', threading_hook):
- custom_hook_output = run_thread()
- threading.excepthook = threading.__excepthook__
- recovered_output = run_thread()
- self.assertEqual(default_output, recovered_output)
- self.assertNotEqual(default_output, custom_hook_output)
- self.assertEqual(custom_hook_output, "Running a thread failed\n")
- class TimerTests(BaseTestCase):
- def setUp(self):
- BaseTestCase.setUp(self)
- self.callback_args = []
- self.callback_event = threading.Event()
- def test_init_immutable_default_args(self):
- # Issue 17435: constructor defaults were mutable objects, they could be
- # mutated via the object attributes and affect other Timer objects.
- timer1 = threading.Timer(0.01, self._callback_spy)
- timer1.start()
- self.callback_event.wait()
- timer1.args.append("blah")
- timer1.kwargs["foo"] = "bar"
- self.callback_event.clear()
- timer2 = threading.Timer(0.01, self._callback_spy)
- timer2.start()
- self.callback_event.wait()
- self.assertEqual(len(self.callback_args), 2)
- self.assertEqual(self.callback_args, [((), {}), ((), {})])
- timer1.join()
- timer2.join()
- def _callback_spy(self, *args, **kwargs):
- self.callback_args.append((args[:], kwargs.copy()))
- self.callback_event.set()
- class LockTests(lock_tests.LockTests):
- locktype = staticmethod(threading.Lock)
- class PyRLockTests(lock_tests.RLockTests):
- locktype = staticmethod(threading._PyRLock)
- @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
- class CRLockTests(lock_tests.RLockTests):
- locktype = staticmethod(threading._CRLock)
- class EventTests(lock_tests.EventTests):
- eventtype = staticmethod(threading.Event)
- class ConditionAsRLockTests(lock_tests.RLockTests):
- # Condition uses an RLock by default and exports its API.
- locktype = staticmethod(threading.Condition)
- class ConditionTests(lock_tests.ConditionTests):
- condtype = staticmethod(threading.Condition)
- class SemaphoreTests(lock_tests.SemaphoreTests):
- semtype = staticmethod(threading.Semaphore)
- class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
- semtype = staticmethod(threading.BoundedSemaphore)
- class BarrierTests(lock_tests.BarrierTests):
- barriertype = staticmethod(threading.Barrier)
- class MiscTestCase(unittest.TestCase):
- def test__all__(self):
- restore_default_excepthook(self)
- extra = {"ThreadError"}
- not_exported = {'currentThread', 'activeCount'}
- support.check__all__(self, threading, ('threading', '_thread'),
- extra=extra, not_exported=not_exported)
- class InterruptMainTests(unittest.TestCase):
- def check_interrupt_main_with_signal_handler(self, signum):
- def handler(signum, frame):
- 1/0
- old_handler = signal.signal(signum, handler)
- self.addCleanup(signal.signal, signum, old_handler)
- with self.assertRaises(ZeroDivisionError):
- _thread.interrupt_main()
- def check_interrupt_main_noerror(self, signum):
- handler = signal.getsignal(signum)
- try:
- # No exception should arise.
- signal.signal(signum, signal.SIG_IGN)
- _thread.interrupt_main(signum)
- signal.signal(signum, signal.SIG_DFL)
- _thread.interrupt_main(signum)
- finally:
- # Restore original handler
- signal.signal(signum, handler)
- def test_interrupt_main_subthread(self):
- # Calling start_new_thread with a function that executes interrupt_main
- # should raise KeyboardInterrupt upon completion.
- def call_interrupt():
- _thread.interrupt_main()
- t = threading.Thread(target=call_interrupt)
- with self.assertRaises(KeyboardInterrupt):
- t.start()
- t.join()
- t.join()
- def test_interrupt_main_mainthread(self):
- # Make sure that if interrupt_main is called in main thread that
- # KeyboardInterrupt is raised instantly.
- with self.assertRaises(KeyboardInterrupt):
- _thread.interrupt_main()
- def test_interrupt_main_with_signal_handler(self):
- self.check_interrupt_main_with_signal_handler(signal.SIGINT)
- self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
- def test_interrupt_main_noerror(self):
- self.check_interrupt_main_noerror(signal.SIGINT)
- self.check_interrupt_main_noerror(signal.SIGTERM)
- def test_interrupt_main_invalid_signal(self):
- self.assertRaises(ValueError, _thread.interrupt_main, -1)
- self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
- self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
- @threading_helper.reap_threads
- def test_can_interrupt_tight_loops(self):
- cont = [True]
- started = [False]
- interrupted = [False]
- def worker(started, cont, interrupted):
- iterations = 100_000_000
- started[0] = True
- while cont[0]:
- if iterations:
- iterations -= 1
- else:
- return
- pass
- interrupted[0] = True
- t = threading.Thread(target=worker,args=(started, cont, interrupted))
- t.start()
- while not started[0]:
- pass
- cont[0] = False
- t.join()
- self.assertTrue(interrupted[0])
- class AtexitTests(unittest.TestCase):
- def test_atexit_output(self):
- rc, out, err = assert_python_ok("-c", """if True:
- import threading
- def run_last():
- print('parrot')
- threading._register_atexit(run_last)
- """)
- self.assertFalse(err)
- self.assertEqual(out.strip(), b'parrot')
- def test_atexit_called_once(self):
- rc, out, err = assert_python_ok("-c", """if True:
- import threading
- from unittest.mock import Mock
- mock = Mock()
- threading._register_atexit(mock)
- mock.assert_not_called()
- # force early shutdown to ensure it was called once
- threading._shutdown()
- mock.assert_called_once()
- """)
- self.assertFalse(err)
- def test_atexit_after_shutdown(self):
- # The only way to do this is by registering an atexit within
- # an atexit, which is intended to raise an exception.
- rc, out, err = assert_python_ok("-c", """if True:
- import threading
- def func():
- pass
- def run_last():
- threading._register_atexit(func)
- threading._register_atexit(run_last)
- """)
- self.assertTrue(err)
- self.assertIn("RuntimeError: can't register atexit after shutdown",
- err.decode())
- if __name__ == "__main__":
- unittest.main()
|