test_concurrent_futures.py 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626
  1. from test import support
  2. from test.support import import_helper
  3. from test.support import threading_helper
  4. # Skip tests if _multiprocessing wasn't built.
  5. import_helper.import_module('_multiprocessing')
  6. from test.support import hashlib_helper
  7. from test.support.script_helper import assert_python_ok
  8. import contextlib
  9. import itertools
  10. import logging
  11. from logging.handlers import QueueHandler
  12. import os
  13. import queue
  14. import sys
  15. import threading
  16. import time
  17. import unittest
  18. import weakref
  19. from pickle import PicklingError
  20. from concurrent import futures
  21. from concurrent.futures._base import (
  22. PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
  23. BrokenExecutor)
  24. from concurrent.futures.process import BrokenProcessPool, _check_system_limits
  25. import multiprocessing.process
  26. import multiprocessing.util
  27. import multiprocessing as mp
  28. if support.check_sanitizer(address=True, memory=True):
  29. # bpo-46633: Skip the test because it is too slow when Python is built
  30. # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
  31. raise unittest.SkipTest("test too slow on ASAN/MSAN build")
  32. def create_future(state=PENDING, exception=None, result=None):
  33. f = Future()
  34. f._state = state
  35. f._exception = exception
  36. f._result = result
  37. return f
  38. PENDING_FUTURE = create_future(state=PENDING)
  39. RUNNING_FUTURE = create_future(state=RUNNING)
  40. CANCELLED_FUTURE = create_future(state=CANCELLED)
  41. CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
  42. EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
  43. SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
  44. INITIALIZER_STATUS = 'uninitialized'
  45. def mul(x, y):
  46. return x * y
  47. def capture(*args, **kwargs):
  48. return args, kwargs
  49. def sleep_and_raise(t):
  50. time.sleep(t)
  51. raise Exception('this is an exception')
  52. def sleep_and_print(t, msg):
  53. time.sleep(t)
  54. print(msg)
  55. sys.stdout.flush()
  56. def init(x):
  57. global INITIALIZER_STATUS
  58. INITIALIZER_STATUS = x
  59. def get_init_status():
  60. return INITIALIZER_STATUS
  61. def init_fail(log_queue=None):
  62. if log_queue is not None:
  63. logger = logging.getLogger('concurrent.futures')
  64. logger.addHandler(QueueHandler(log_queue))
  65. logger.setLevel('CRITICAL')
  66. logger.propagate = False
  67. time.sleep(0.1) # let some futures be scheduled
  68. raise ValueError('error in initializer')
  69. class MyObject(object):
  70. def my_method(self):
  71. pass
  72. class EventfulGCObj():
  73. def __init__(self, mgr):
  74. self.event = mgr.Event()
  75. def __del__(self):
  76. self.event.set()
  77. def make_dummy_object(_):
  78. return MyObject()
  79. class BaseTestCase(unittest.TestCase):
  80. def setUp(self):
  81. self._thread_key = threading_helper.threading_setup()
  82. def tearDown(self):
  83. support.reap_children()
  84. threading_helper.threading_cleanup(*self._thread_key)
  85. class ExecutorMixin:
  86. worker_count = 5
  87. executor_kwargs = {}
  88. def setUp(self):
  89. super().setUp()
  90. self.t1 = time.monotonic()
  91. if hasattr(self, "ctx"):
  92. self.executor = self.executor_type(
  93. max_workers=self.worker_count,
  94. mp_context=self.get_context(),
  95. **self.executor_kwargs)
  96. else:
  97. self.executor = self.executor_type(
  98. max_workers=self.worker_count,
  99. **self.executor_kwargs)
  100. def tearDown(self):
  101. self.executor.shutdown(wait=True)
  102. self.executor = None
  103. dt = time.monotonic() - self.t1
  104. if support.verbose:
  105. print("%.2fs" % dt, end=' ')
  106. self.assertLess(dt, 300, "synchronization issue: test lasted too long")
  107. super().tearDown()
  108. def get_context(self):
  109. return mp.get_context(self.ctx)
  110. class ThreadPoolMixin(ExecutorMixin):
  111. executor_type = futures.ThreadPoolExecutor
  112. class ProcessPoolForkMixin(ExecutorMixin):
  113. executor_type = futures.ProcessPoolExecutor
  114. ctx = "fork"
  115. def get_context(self):
  116. try:
  117. _check_system_limits()
  118. except NotImplementedError:
  119. self.skipTest("ProcessPoolExecutor unavailable on this system")
  120. if sys.platform == "win32":
  121. self.skipTest("require unix system")
  122. return super().get_context()
  123. class ProcessPoolSpawnMixin(ExecutorMixin):
  124. executor_type = futures.ProcessPoolExecutor
  125. ctx = "spawn"
  126. def get_context(self):
  127. try:
  128. _check_system_limits()
  129. except NotImplementedError:
  130. self.skipTest("ProcessPoolExecutor unavailable on this system")
  131. return super().get_context()
  132. class ProcessPoolForkserverMixin(ExecutorMixin):
  133. executor_type = futures.ProcessPoolExecutor
  134. ctx = "forkserver"
  135. def get_context(self):
  136. try:
  137. _check_system_limits()
  138. except NotImplementedError:
  139. self.skipTest("ProcessPoolExecutor unavailable on this system")
  140. if sys.platform == "win32":
  141. self.skipTest("require unix system")
  142. return super().get_context()
  143. def create_executor_tests(mixin, bases=(BaseTestCase,),
  144. executor_mixins=(ThreadPoolMixin,
  145. ProcessPoolForkMixin,
  146. ProcessPoolForkserverMixin,
  147. ProcessPoolSpawnMixin)):
  148. def strip_mixin(name):
  149. if name.endswith(('Mixin', 'Tests')):
  150. return name[:-5]
  151. elif name.endswith('Test'):
  152. return name[:-4]
  153. else:
  154. return name
  155. for exe in executor_mixins:
  156. name = ("%s%sTest"
  157. % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
  158. cls = type(name, (mixin,) + (exe,) + bases, {})
  159. globals()[name] = cls
  160. class InitializerMixin(ExecutorMixin):
  161. worker_count = 2
  162. def setUp(self):
  163. global INITIALIZER_STATUS
  164. INITIALIZER_STATUS = 'uninitialized'
  165. self.executor_kwargs = dict(initializer=init,
  166. initargs=('initialized',))
  167. super().setUp()
  168. def test_initializer(self):
  169. futures = [self.executor.submit(get_init_status)
  170. for _ in range(self.worker_count)]
  171. for f in futures:
  172. self.assertEqual(f.result(), 'initialized')
  173. class FailingInitializerMixin(ExecutorMixin):
  174. worker_count = 2
  175. def setUp(self):
  176. if hasattr(self, "ctx"):
  177. # Pass a queue to redirect the child's logging output
  178. self.mp_context = self.get_context()
  179. self.log_queue = self.mp_context.Queue()
  180. self.executor_kwargs = dict(initializer=init_fail,
  181. initargs=(self.log_queue,))
  182. else:
  183. # In a thread pool, the child shares our logging setup
  184. # (see _assert_logged())
  185. self.mp_context = None
  186. self.log_queue = None
  187. self.executor_kwargs = dict(initializer=init_fail)
  188. super().setUp()
  189. def test_initializer(self):
  190. with self._assert_logged('ValueError: error in initializer'):
  191. try:
  192. future = self.executor.submit(get_init_status)
  193. except BrokenExecutor:
  194. # Perhaps the executor is already broken
  195. pass
  196. else:
  197. with self.assertRaises(BrokenExecutor):
  198. future.result()
  199. # At some point, the executor should break
  200. t1 = time.monotonic()
  201. while not self.executor._broken:
  202. if time.monotonic() - t1 > 5:
  203. self.fail("executor not broken after 5 s.")
  204. time.sleep(0.01)
  205. # ... and from this point submit() is guaranteed to fail
  206. with self.assertRaises(BrokenExecutor):
  207. self.executor.submit(get_init_status)
  208. @contextlib.contextmanager
  209. def _assert_logged(self, msg):
  210. if self.log_queue is not None:
  211. yield
  212. output = []
  213. try:
  214. while True:
  215. output.append(self.log_queue.get_nowait().getMessage())
  216. except queue.Empty:
  217. pass
  218. else:
  219. with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
  220. yield
  221. output = cm.output
  222. self.assertTrue(any(msg in line for line in output),
  223. output)
  224. create_executor_tests(InitializerMixin)
  225. create_executor_tests(FailingInitializerMixin)
  226. class ExecutorShutdownTest:
  227. def test_run_after_shutdown(self):
  228. self.executor.shutdown()
  229. self.assertRaises(RuntimeError,
  230. self.executor.submit,
  231. pow, 2, 5)
  232. def test_interpreter_shutdown(self):
  233. # Test the atexit hook for shutdown of worker threads and processes
  234. rc, out, err = assert_python_ok('-c', """if 1:
  235. from concurrent.futures import {executor_type}
  236. from time import sleep
  237. from test.test_concurrent_futures import sleep_and_print
  238. if __name__ == "__main__":
  239. context = '{context}'
  240. if context == "":
  241. t = {executor_type}(5)
  242. else:
  243. from multiprocessing import get_context
  244. context = get_context(context)
  245. t = {executor_type}(5, mp_context=context)
  246. t.submit(sleep_and_print, 1.0, "apple")
  247. """.format(executor_type=self.executor_type.__name__,
  248. context=getattr(self, "ctx", "")))
  249. # Errors in atexit hooks don't change the process exit code, check
  250. # stderr manually.
  251. self.assertFalse(err)
  252. self.assertEqual(out.strip(), b"apple")
  253. def test_submit_after_interpreter_shutdown(self):
  254. # Test the atexit hook for shutdown of worker threads and processes
  255. rc, out, err = assert_python_ok('-c', """if 1:
  256. import atexit
  257. @atexit.register
  258. def run_last():
  259. try:
  260. t.submit(id, None)
  261. except RuntimeError:
  262. print("runtime-error")
  263. raise
  264. from concurrent.futures import {executor_type}
  265. if __name__ == "__main__":
  266. context = '{context}'
  267. if not context:
  268. t = {executor_type}(5)
  269. else:
  270. from multiprocessing import get_context
  271. context = get_context(context)
  272. t = {executor_type}(5, mp_context=context)
  273. t.submit(id, 42).result()
  274. """.format(executor_type=self.executor_type.__name__,
  275. context=getattr(self, "ctx", "")))
  276. # Errors in atexit hooks don't change the process exit code, check
  277. # stderr manually.
  278. self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
  279. self.assertEqual(out.strip(), b"runtime-error")
  280. def test_hang_issue12364(self):
  281. fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
  282. self.executor.shutdown()
  283. for f in fs:
  284. f.result()
  285. def test_cancel_futures(self):
  286. assert self.worker_count <= 5, "test needs few workers"
  287. fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
  288. self.executor.shutdown(cancel_futures=True)
  289. # We can't guarantee the exact number of cancellations, but we can
  290. # guarantee that *some* were cancelled. With few workers, many of
  291. # the submitted futures should have been cancelled.
  292. cancelled = [fut for fut in fs if fut.cancelled()]
  293. self.assertGreater(len(cancelled), 20)
  294. # Ensure the other futures were able to finish.
  295. # Use "not fut.cancelled()" instead of "fut.done()" to include futures
  296. # that may have been left in a pending state.
  297. others = [fut for fut in fs if not fut.cancelled()]
  298. for fut in others:
  299. self.assertTrue(fut.done(), msg=f"{fut._state=}")
  300. self.assertIsNone(fut.exception())
  301. # Similar to the number of cancelled futures, we can't guarantee the
  302. # exact number that completed. But, we can guarantee that at least
  303. # one finished.
  304. self.assertGreater(len(others), 0)
  305. def test_hang_gh83386(self):
  306. """shutdown(wait=False) doesn't hang at exit with running futures.
  307. See https://github.com/python/cpython/issues/83386.
  308. """
  309. if self.executor_type == futures.ProcessPoolExecutor:
  310. raise unittest.SkipTest(
  311. "Hangs, see https://github.com/python/cpython/issues/83386")
  312. rc, out, err = assert_python_ok('-c', """if True:
  313. from concurrent.futures import {executor_type}
  314. from test.test_concurrent_futures import sleep_and_print
  315. if __name__ == "__main__":
  316. if {context!r}: multiprocessing.set_start_method({context!r})
  317. t = {executor_type}(max_workers=3)
  318. t.submit(sleep_and_print, 1.0, "apple")
  319. t.shutdown(wait=False)
  320. """.format(executor_type=self.executor_type.__name__,
  321. context=getattr(self, 'ctx', None)))
  322. self.assertFalse(err)
  323. self.assertEqual(out.strip(), b"apple")
  324. class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
  325. def test_threads_terminate(self):
  326. def acquire_lock(lock):
  327. lock.acquire()
  328. sem = threading.Semaphore(0)
  329. for i in range(3):
  330. self.executor.submit(acquire_lock, sem)
  331. self.assertEqual(len(self.executor._threads), 3)
  332. for i in range(3):
  333. sem.release()
  334. self.executor.shutdown()
  335. for t in self.executor._threads:
  336. t.join()
  337. def test_context_manager_shutdown(self):
  338. with futures.ThreadPoolExecutor(max_workers=5) as e:
  339. executor = e
  340. self.assertEqual(list(e.map(abs, range(-5, 5))),
  341. [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
  342. for t in executor._threads:
  343. t.join()
  344. def test_del_shutdown(self):
  345. executor = futures.ThreadPoolExecutor(max_workers=5)
  346. res = executor.map(abs, range(-5, 5))
  347. threads = executor._threads
  348. del executor
  349. for t in threads:
  350. t.join()
  351. # Make sure the results were all computed before the
  352. # executor got shutdown.
  353. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
  354. def test_shutdown_no_wait(self):
  355. # Ensure that the executor cleans up the threads when calling
  356. # shutdown with wait=False
  357. executor = futures.ThreadPoolExecutor(max_workers=5)
  358. res = executor.map(abs, range(-5, 5))
  359. threads = executor._threads
  360. executor.shutdown(wait=False)
  361. for t in threads:
  362. t.join()
  363. # Make sure the results were all computed before the
  364. # executor got shutdown.
  365. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
  366. def test_thread_names_assigned(self):
  367. executor = futures.ThreadPoolExecutor(
  368. max_workers=5, thread_name_prefix='SpecialPool')
  369. executor.map(abs, range(-5, 5))
  370. threads = executor._threads
  371. del executor
  372. support.gc_collect() # For PyPy or other GCs.
  373. for t in threads:
  374. self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
  375. t.join()
  376. def test_thread_names_default(self):
  377. executor = futures.ThreadPoolExecutor(max_workers=5)
  378. executor.map(abs, range(-5, 5))
  379. threads = executor._threads
  380. del executor
  381. support.gc_collect() # For PyPy or other GCs.
  382. for t in threads:
  383. # Ensure that our default name is reasonably sane and unique when
  384. # no thread_name_prefix was supplied.
  385. self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
  386. t.join()
  387. def test_cancel_futures_wait_false(self):
  388. # Can only be reliably tested for TPE, since PPE often hangs with
  389. # `wait=False` (even without *cancel_futures*).
  390. rc, out, err = assert_python_ok('-c', """if True:
  391. from concurrent.futures import ThreadPoolExecutor
  392. from test.test_concurrent_futures import sleep_and_print
  393. if __name__ == "__main__":
  394. t = ThreadPoolExecutor()
  395. t.submit(sleep_and_print, .1, "apple")
  396. t.shutdown(wait=False, cancel_futures=True)
  397. """)
  398. # Errors in atexit hooks don't change the process exit code, check
  399. # stderr manually.
  400. self.assertFalse(err)
  401. self.assertEqual(out.strip(), b"apple")
  402. class ProcessPoolShutdownTest(ExecutorShutdownTest):
  403. def test_processes_terminate(self):
  404. def acquire_lock(lock):
  405. lock.acquire()
  406. mp_context = self.get_context()
  407. if mp_context.get_start_method(allow_none=False) == "fork":
  408. # fork pre-spawns, not on demand.
  409. expected_num_processes = self.worker_count
  410. else:
  411. expected_num_processes = 3
  412. sem = mp_context.Semaphore(0)
  413. for _ in range(3):
  414. self.executor.submit(acquire_lock, sem)
  415. self.assertEqual(len(self.executor._processes), expected_num_processes)
  416. for _ in range(3):
  417. sem.release()
  418. processes = self.executor._processes
  419. self.executor.shutdown()
  420. for p in processes.values():
  421. p.join()
  422. def test_context_manager_shutdown(self):
  423. with futures.ProcessPoolExecutor(
  424. max_workers=5, mp_context=self.get_context()) as e:
  425. processes = e._processes
  426. self.assertEqual(list(e.map(abs, range(-5, 5))),
  427. [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
  428. for p in processes.values():
  429. p.join()
  430. def test_del_shutdown(self):
  431. executor = futures.ProcessPoolExecutor(
  432. max_workers=5, mp_context=self.get_context())
  433. res = executor.map(abs, range(-5, 5))
  434. executor_manager_thread = executor._executor_manager_thread
  435. processes = executor._processes
  436. call_queue = executor._call_queue
  437. executor_manager_thread = executor._executor_manager_thread
  438. del executor
  439. support.gc_collect() # For PyPy or other GCs.
  440. # Make sure that all the executor resources were properly cleaned by
  441. # the shutdown process
  442. executor_manager_thread.join()
  443. for p in processes.values():
  444. p.join()
  445. call_queue.join_thread()
  446. # Make sure the results were all computed before the
  447. # executor got shutdown.
  448. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
  449. def test_shutdown_no_wait(self):
  450. # Ensure that the executor cleans up the processes when calling
  451. # shutdown with wait=False
  452. executor = futures.ProcessPoolExecutor(
  453. max_workers=5, mp_context=self.get_context())
  454. res = executor.map(abs, range(-5, 5))
  455. processes = executor._processes
  456. call_queue = executor._call_queue
  457. executor_manager_thread = executor._executor_manager_thread
  458. executor.shutdown(wait=False)
  459. # Make sure that all the executor resources were properly cleaned by
  460. # the shutdown process
  461. executor_manager_thread.join()
  462. for p in processes.values():
  463. p.join()
  464. call_queue.join_thread()
  465. # Make sure the results were all computed before the executor got
  466. # shutdown.
  467. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
  468. create_executor_tests(ProcessPoolShutdownTest,
  469. executor_mixins=(ProcessPoolForkMixin,
  470. ProcessPoolForkserverMixin,
  471. ProcessPoolSpawnMixin))
  472. class WaitTests:
  473. def test_20369(self):
  474. # See https://bugs.python.org/issue20369
  475. future = self.executor.submit(time.sleep, 1.5)
  476. done, not_done = futures.wait([future, future],
  477. return_when=futures.ALL_COMPLETED)
  478. self.assertEqual({future}, done)
  479. self.assertEqual(set(), not_done)
  480. def test_first_completed(self):
  481. future1 = self.executor.submit(mul, 21, 2)
  482. future2 = self.executor.submit(time.sleep, 1.5)
  483. done, not_done = futures.wait(
  484. [CANCELLED_FUTURE, future1, future2],
  485. return_when=futures.FIRST_COMPLETED)
  486. self.assertEqual(set([future1]), done)
  487. self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
  488. def test_first_completed_some_already_completed(self):
  489. future1 = self.executor.submit(time.sleep, 1.5)
  490. finished, pending = futures.wait(
  491. [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
  492. return_when=futures.FIRST_COMPLETED)
  493. self.assertEqual(
  494. set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
  495. finished)
  496. self.assertEqual(set([future1]), pending)
  497. def test_first_exception(self):
  498. future1 = self.executor.submit(mul, 2, 21)
  499. future2 = self.executor.submit(sleep_and_raise, 1.5)
  500. future3 = self.executor.submit(time.sleep, 3)
  501. finished, pending = futures.wait(
  502. [future1, future2, future3],
  503. return_when=futures.FIRST_EXCEPTION)
  504. self.assertEqual(set([future1, future2]), finished)
  505. self.assertEqual(set([future3]), pending)
  506. def test_first_exception_some_already_complete(self):
  507. future1 = self.executor.submit(divmod, 21, 0)
  508. future2 = self.executor.submit(time.sleep, 1.5)
  509. finished, pending = futures.wait(
  510. [SUCCESSFUL_FUTURE,
  511. CANCELLED_FUTURE,
  512. CANCELLED_AND_NOTIFIED_FUTURE,
  513. future1, future2],
  514. return_when=futures.FIRST_EXCEPTION)
  515. self.assertEqual(set([SUCCESSFUL_FUTURE,
  516. CANCELLED_AND_NOTIFIED_FUTURE,
  517. future1]), finished)
  518. self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
  519. def test_first_exception_one_already_failed(self):
  520. future1 = self.executor.submit(time.sleep, 2)
  521. finished, pending = futures.wait(
  522. [EXCEPTION_FUTURE, future1],
  523. return_when=futures.FIRST_EXCEPTION)
  524. self.assertEqual(set([EXCEPTION_FUTURE]), finished)
  525. self.assertEqual(set([future1]), pending)
  526. def test_all_completed(self):
  527. future1 = self.executor.submit(divmod, 2, 0)
  528. future2 = self.executor.submit(mul, 2, 21)
  529. finished, pending = futures.wait(
  530. [SUCCESSFUL_FUTURE,
  531. CANCELLED_AND_NOTIFIED_FUTURE,
  532. EXCEPTION_FUTURE,
  533. future1,
  534. future2],
  535. return_when=futures.ALL_COMPLETED)
  536. self.assertEqual(set([SUCCESSFUL_FUTURE,
  537. CANCELLED_AND_NOTIFIED_FUTURE,
  538. EXCEPTION_FUTURE,
  539. future1,
  540. future2]), finished)
  541. self.assertEqual(set(), pending)
  542. def test_timeout(self):
  543. future1 = self.executor.submit(mul, 6, 7)
  544. future2 = self.executor.submit(time.sleep, 6)
  545. finished, pending = futures.wait(
  546. [CANCELLED_AND_NOTIFIED_FUTURE,
  547. EXCEPTION_FUTURE,
  548. SUCCESSFUL_FUTURE,
  549. future1, future2],
  550. timeout=5,
  551. return_when=futures.ALL_COMPLETED)
  552. self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
  553. EXCEPTION_FUTURE,
  554. SUCCESSFUL_FUTURE,
  555. future1]), finished)
  556. self.assertEqual(set([future2]), pending)
  557. class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
  558. def test_pending_calls_race(self):
  559. # Issue #14406: multi-threaded race condition when waiting on all
  560. # futures.
  561. event = threading.Event()
  562. def future_func():
  563. event.wait()
  564. oldswitchinterval = sys.getswitchinterval()
  565. sys.setswitchinterval(1e-6)
  566. try:
  567. fs = {self.executor.submit(future_func) for i in range(100)}
  568. event.set()
  569. futures.wait(fs, return_when=futures.ALL_COMPLETED)
  570. finally:
  571. sys.setswitchinterval(oldswitchinterval)
  572. create_executor_tests(WaitTests,
  573. executor_mixins=(ProcessPoolForkMixin,
  574. ProcessPoolForkserverMixin,
  575. ProcessPoolSpawnMixin))
  576. class AsCompletedTests:
  577. # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
  578. def test_no_timeout(self):
  579. future1 = self.executor.submit(mul, 2, 21)
  580. future2 = self.executor.submit(mul, 7, 6)
  581. completed = set(futures.as_completed(
  582. [CANCELLED_AND_NOTIFIED_FUTURE,
  583. EXCEPTION_FUTURE,
  584. SUCCESSFUL_FUTURE,
  585. future1, future2]))
  586. self.assertEqual(set(
  587. [CANCELLED_AND_NOTIFIED_FUTURE,
  588. EXCEPTION_FUTURE,
  589. SUCCESSFUL_FUTURE,
  590. future1, future2]),
  591. completed)
  592. def test_zero_timeout(self):
  593. future1 = self.executor.submit(time.sleep, 2)
  594. completed_futures = set()
  595. try:
  596. for future in futures.as_completed(
  597. [CANCELLED_AND_NOTIFIED_FUTURE,
  598. EXCEPTION_FUTURE,
  599. SUCCESSFUL_FUTURE,
  600. future1],
  601. timeout=0):
  602. completed_futures.add(future)
  603. except futures.TimeoutError:
  604. pass
  605. self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
  606. EXCEPTION_FUTURE,
  607. SUCCESSFUL_FUTURE]),
  608. completed_futures)
  609. def test_duplicate_futures(self):
  610. # Issue 20367. Duplicate futures should not raise exceptions or give
  611. # duplicate responses.
  612. # Issue #31641: accept arbitrary iterables.
  613. future1 = self.executor.submit(time.sleep, 2)
  614. completed = [
  615. f for f in futures.as_completed(itertools.repeat(future1, 3))
  616. ]
  617. self.assertEqual(len(completed), 1)
  618. def test_free_reference_yielded_future(self):
  619. # Issue #14406: Generator should not keep references
  620. # to finished futures.
  621. futures_list = [Future() for _ in range(8)]
  622. futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
  623. futures_list.append(create_future(state=FINISHED, result=42))
  624. with self.assertRaises(futures.TimeoutError):
  625. for future in futures.as_completed(futures_list, timeout=0):
  626. futures_list.remove(future)
  627. wr = weakref.ref(future)
  628. del future
  629. support.gc_collect() # For PyPy or other GCs.
  630. self.assertIsNone(wr())
  631. futures_list[0].set_result("test")
  632. for future in futures.as_completed(futures_list):
  633. futures_list.remove(future)
  634. wr = weakref.ref(future)
  635. del future
  636. support.gc_collect() # For PyPy or other GCs.
  637. self.assertIsNone(wr())
  638. if futures_list:
  639. futures_list[0].set_result("test")
  640. def test_correct_timeout_exception_msg(self):
  641. futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
  642. RUNNING_FUTURE, SUCCESSFUL_FUTURE]
  643. with self.assertRaises(futures.TimeoutError) as cm:
  644. list(futures.as_completed(futures_list, timeout=0))
  645. self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
  646. create_executor_tests(AsCompletedTests)
  647. class ExecutorTest:
  648. # Executor.shutdown() and context manager usage is tested by
  649. # ExecutorShutdownTest.
  650. def test_submit(self):
  651. future = self.executor.submit(pow, 2, 8)
  652. self.assertEqual(256, future.result())
  653. def test_submit_keyword(self):
  654. future = self.executor.submit(mul, 2, y=8)
  655. self.assertEqual(16, future.result())
  656. future = self.executor.submit(capture, 1, self=2, fn=3)
  657. self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
  658. with self.assertRaises(TypeError):
  659. self.executor.submit(fn=capture, arg=1)
  660. with self.assertRaises(TypeError):
  661. self.executor.submit(arg=1)
  662. def test_map(self):
  663. self.assertEqual(
  664. list(self.executor.map(pow, range(10), range(10))),
  665. list(map(pow, range(10), range(10))))
  666. self.assertEqual(
  667. list(self.executor.map(pow, range(10), range(10), chunksize=3)),
  668. list(map(pow, range(10), range(10))))
  669. def test_map_exception(self):
  670. i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
  671. self.assertEqual(i.__next__(), (0, 1))
  672. self.assertEqual(i.__next__(), (0, 1))
  673. self.assertRaises(ZeroDivisionError, i.__next__)
  674. def test_map_timeout(self):
  675. results = []
  676. try:
  677. for i in self.executor.map(time.sleep,
  678. [0, 0, 6],
  679. timeout=5):
  680. results.append(i)
  681. except futures.TimeoutError:
  682. pass
  683. else:
  684. self.fail('expected TimeoutError')
  685. self.assertEqual([None, None], results)
  686. def test_shutdown_race_issue12456(self):
  687. # Issue #12456: race condition at shutdown where trying to post a
  688. # sentinel in the call queue blocks (the queue is full while processes
  689. # have exited).
  690. self.executor.map(str, [2] * (self.worker_count + 1))
  691. self.executor.shutdown()
  692. @support.cpython_only
  693. def test_no_stale_references(self):
  694. # Issue #16284: check that the executors don't unnecessarily hang onto
  695. # references.
  696. my_object = MyObject()
  697. my_object_collected = threading.Event()
  698. my_object_callback = weakref.ref(
  699. my_object, lambda obj: my_object_collected.set())
  700. # Deliberately discarding the future.
  701. self.executor.submit(my_object.my_method)
  702. del my_object
  703. collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
  704. self.assertTrue(collected,
  705. "Stale reference not collected within timeout.")
  706. def test_max_workers_negative(self):
  707. for number in (0, -1):
  708. with self.assertRaisesRegex(ValueError,
  709. "max_workers must be greater "
  710. "than 0"):
  711. self.executor_type(max_workers=number)
  712. def test_free_reference(self):
  713. # Issue #14406: Result iterator should not keep an internal
  714. # reference to result objects.
  715. for obj in self.executor.map(make_dummy_object, range(10)):
  716. wr = weakref.ref(obj)
  717. del obj
  718. support.gc_collect() # For PyPy or other GCs.
  719. self.assertIsNone(wr())
  720. class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
  721. def test_map_submits_without_iteration(self):
  722. """Tests verifying issue 11777."""
  723. finished = []
  724. def record_finished(n):
  725. finished.append(n)
  726. self.executor.map(record_finished, range(10))
  727. self.executor.shutdown(wait=True)
  728. self.assertCountEqual(finished, range(10))
  729. def test_default_workers(self):
  730. executor = self.executor_type()
  731. expected = min(32, (os.cpu_count() or 1) + 4)
  732. self.assertEqual(executor._max_workers, expected)
  733. def test_saturation(self):
  734. executor = self.executor_type(4)
  735. def acquire_lock(lock):
  736. lock.acquire()
  737. sem = threading.Semaphore(0)
  738. for i in range(15 * executor._max_workers):
  739. executor.submit(acquire_lock, sem)
  740. self.assertEqual(len(executor._threads), executor._max_workers)
  741. for i in range(15 * executor._max_workers):
  742. sem.release()
  743. executor.shutdown(wait=True)
  744. def test_idle_thread_reuse(self):
  745. executor = self.executor_type()
  746. executor.submit(mul, 21, 2).result()
  747. executor.submit(mul, 6, 7).result()
  748. executor.submit(mul, 3, 14).result()
  749. self.assertEqual(len(executor._threads), 1)
  750. executor.shutdown(wait=True)
  751. @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
  752. def test_hang_global_shutdown_lock(self):
  753. # bpo-45021: _global_shutdown_lock should be reinitialized in the child
  754. # process, otherwise it will never exit
  755. def submit(pool):
  756. pool.submit(submit, pool)
  757. with futures.ThreadPoolExecutor(1) as pool:
  758. pool.submit(submit, pool)
  759. for _ in range(50):
  760. with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
  761. workers.submit(tuple)
  762. def test_executor_map_current_future_cancel(self):
  763. stop_event = threading.Event()
  764. log = []
  765. def log_n_wait(ident):
  766. log.append(f"{ident=} started")
  767. try:
  768. stop_event.wait()
  769. finally:
  770. log.append(f"{ident=} stopped")
  771. with self.executor_type(max_workers=1) as pool:
  772. # submit work to saturate the pool
  773. fut = pool.submit(log_n_wait, ident="first")
  774. try:
  775. with contextlib.closing(
  776. pool.map(log_n_wait, ["second", "third"], timeout=0)
  777. ) as gen:
  778. with self.assertRaises(TimeoutError):
  779. next(gen)
  780. finally:
  781. stop_event.set()
  782. fut.result()
  783. # ident='second' is cancelled as a result of raising a TimeoutError
  784. # ident='third' is cancelled because it remained in the collection of futures
  785. self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
  786. class ProcessPoolExecutorTest(ExecutorTest):
  787. @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
  788. def test_max_workers_too_large(self):
  789. with self.assertRaisesRegex(ValueError,
  790. "max_workers must be <= 61"):
  791. futures.ProcessPoolExecutor(max_workers=62)
  792. def test_killed_child(self):
  793. # When a child process is abruptly terminated, the whole pool gets
  794. # "broken".
  795. futures = [self.executor.submit(time.sleep, 3)]
  796. # Get one of the processes, and terminate (kill) it
  797. p = next(iter(self.executor._processes.values()))
  798. p.terminate()
  799. for fut in futures:
  800. self.assertRaises(BrokenProcessPool, fut.result)
  801. # Submitting other jobs fails as well.
  802. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
  803. def test_map_chunksize(self):
  804. def bad_map():
  805. list(self.executor.map(pow, range(40), range(40), chunksize=-1))
  806. ref = list(map(pow, range(40), range(40)))
  807. self.assertEqual(
  808. list(self.executor.map(pow, range(40), range(40), chunksize=6)),
  809. ref)
  810. self.assertEqual(
  811. list(self.executor.map(pow, range(40), range(40), chunksize=50)),
  812. ref)
  813. self.assertEqual(
  814. list(self.executor.map(pow, range(40), range(40), chunksize=40)),
  815. ref)
  816. self.assertRaises(ValueError, bad_map)
  817. @classmethod
  818. def _test_traceback(cls):
  819. raise RuntimeError(123) # some comment
  820. def test_traceback(self):
  821. # We want ensure that the traceback from the child process is
  822. # contained in the traceback raised in the main process.
  823. future = self.executor.submit(self._test_traceback)
  824. with self.assertRaises(Exception) as cm:
  825. future.result()
  826. exc = cm.exception
  827. self.assertIs(type(exc), RuntimeError)
  828. self.assertEqual(exc.args, (123,))
  829. cause = exc.__cause__
  830. self.assertIs(type(cause), futures.process._RemoteTraceback)
  831. self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
  832. with support.captured_stderr() as f1:
  833. try:
  834. raise exc
  835. except RuntimeError:
  836. sys.excepthook(*sys.exc_info())
  837. self.assertIn('raise RuntimeError(123) # some comment',
  838. f1.getvalue())
  839. @hashlib_helper.requires_hashdigest('md5')
  840. def test_ressources_gced_in_workers(self):
  841. # Ensure that argument for a job are correctly gc-ed after the job
  842. # is finished
  843. mgr = self.get_context().Manager()
  844. obj = EventfulGCObj(mgr)
  845. future = self.executor.submit(id, obj)
  846. future.result()
  847. self.assertTrue(obj.event.wait(timeout=1))
  848. # explicitly destroy the object to ensure that EventfulGCObj.__del__()
  849. # is called while manager is still running.
  850. obj = None
  851. support.gc_collect()
  852. mgr.shutdown()
  853. mgr.join()
  854. def test_saturation(self):
  855. executor = self.executor
  856. mp_context = self.get_context()
  857. sem = mp_context.Semaphore(0)
  858. job_count = 15 * executor._max_workers
  859. for _ in range(job_count):
  860. executor.submit(sem.acquire)
  861. self.assertEqual(len(executor._processes), executor._max_workers)
  862. for _ in range(job_count):
  863. sem.release()
  864. def test_idle_process_reuse_one(self):
  865. executor = self.executor
  866. assert executor._max_workers >= 4
  867. if self.get_context().get_start_method(allow_none=False) == "fork":
  868. raise unittest.SkipTest("Incompatible with the fork start method.")
  869. executor.submit(mul, 21, 2).result()
  870. executor.submit(mul, 6, 7).result()
  871. executor.submit(mul, 3, 14).result()
  872. self.assertEqual(len(executor._processes), 1)
  873. def test_idle_process_reuse_multiple(self):
  874. executor = self.executor
  875. assert executor._max_workers <= 5
  876. if self.get_context().get_start_method(allow_none=False) == "fork":
  877. raise unittest.SkipTest("Incompatible with the fork start method.")
  878. executor.submit(mul, 12, 7).result()
  879. executor.submit(mul, 33, 25)
  880. executor.submit(mul, 25, 26).result()
  881. executor.submit(mul, 18, 29)
  882. executor.submit(mul, 1, 2).result()
  883. executor.submit(mul, 0, 9)
  884. self.assertLessEqual(len(executor._processes), 3)
  885. executor.shutdown()
  886. def test_max_tasks_per_child(self):
  887. context = self.get_context()
  888. if context.get_start_method(allow_none=False) == "fork":
  889. with self.assertRaises(ValueError):
  890. self.executor_type(1, mp_context=context, max_tasks_per_child=3)
  891. return
  892. # not using self.executor as we need to control construction.
  893. # arguably this could go in another class w/o that mixin.
  894. executor = self.executor_type(
  895. 1, mp_context=context, max_tasks_per_child=3)
  896. f1 = executor.submit(os.getpid)
  897. original_pid = f1.result()
  898. # The worker pid remains the same as the worker could be reused
  899. f2 = executor.submit(os.getpid)
  900. self.assertEqual(f2.result(), original_pid)
  901. self.assertEqual(len(executor._processes), 1)
  902. f3 = executor.submit(os.getpid)
  903. self.assertEqual(f3.result(), original_pid)
  904. # A new worker is spawned, with a statistically different pid,
  905. # while the previous was reaped.
  906. f4 = executor.submit(os.getpid)
  907. new_pid = f4.result()
  908. self.assertNotEqual(original_pid, new_pid)
  909. self.assertEqual(len(executor._processes), 1)
  910. executor.shutdown()
  911. def test_max_tasks_per_child_defaults_to_spawn_context(self):
  912. # not using self.executor as we need to control construction.
  913. # arguably this could go in another class w/o that mixin.
  914. executor = self.executor_type(1, max_tasks_per_child=3)
  915. self.assertEqual(executor._mp_context.get_start_method(), "spawn")
  916. def test_max_tasks_early_shutdown(self):
  917. context = self.get_context()
  918. if context.get_start_method(allow_none=False) == "fork":
  919. raise unittest.SkipTest("Incompatible with the fork start method.")
  920. # not using self.executor as we need to control construction.
  921. # arguably this could go in another class w/o that mixin.
  922. executor = self.executor_type(
  923. 3, mp_context=context, max_tasks_per_child=1)
  924. futures = []
  925. for i in range(6):
  926. futures.append(executor.submit(mul, i, i))
  927. executor.shutdown()
  928. for i, future in enumerate(futures):
  929. self.assertEqual(future.result(), mul(i, i))
  930. create_executor_tests(ProcessPoolExecutorTest,
  931. executor_mixins=(ProcessPoolForkMixin,
  932. ProcessPoolForkserverMixin,
  933. ProcessPoolSpawnMixin))
  934. def _crash(delay=None):
  935. """Induces a segfault."""
  936. if delay:
  937. time.sleep(delay)
  938. import faulthandler
  939. faulthandler.disable()
  940. faulthandler._sigsegv()
  941. def _exit():
  942. """Induces a sys exit with exitcode 1."""
  943. sys.exit(1)
  944. def _raise_error(Err):
  945. """Function that raises an Exception in process."""
  946. raise Err()
  947. def _raise_error_ignore_stderr(Err):
  948. """Function that raises an Exception in process and ignores stderr."""
  949. import io
  950. sys.stderr = io.StringIO()
  951. raise Err()
  952. def _return_instance(cls):
  953. """Function that returns a instance of cls."""
  954. return cls()
  955. class CrashAtPickle(object):
  956. """Bad object that triggers a segfault at pickling time."""
  957. def __reduce__(self):
  958. _crash()
  959. class CrashAtUnpickle(object):
  960. """Bad object that triggers a segfault at unpickling time."""
  961. def __reduce__(self):
  962. return _crash, ()
  963. class ExitAtPickle(object):
  964. """Bad object that triggers a process exit at pickling time."""
  965. def __reduce__(self):
  966. _exit()
  967. class ExitAtUnpickle(object):
  968. """Bad object that triggers a process exit at unpickling time."""
  969. def __reduce__(self):
  970. return _exit, ()
  971. class ErrorAtPickle(object):
  972. """Bad object that triggers an error at pickling time."""
  973. def __reduce__(self):
  974. from pickle import PicklingError
  975. raise PicklingError("Error in pickle")
  976. class ErrorAtUnpickle(object):
  977. """Bad object that triggers an error at unpickling time."""
  978. def __reduce__(self):
  979. from pickle import UnpicklingError
  980. return _raise_error_ignore_stderr, (UnpicklingError, )
  981. class ExecutorDeadlockTest:
  982. TIMEOUT = support.SHORT_TIMEOUT
  983. def _fail_on_deadlock(self, executor):
  984. # If we did not recover before TIMEOUT seconds, consider that the
  985. # executor is in a deadlock state and forcefully clean all its
  986. # composants.
  987. import faulthandler
  988. from tempfile import TemporaryFile
  989. with TemporaryFile(mode="w+") as f:
  990. faulthandler.dump_traceback(file=f)
  991. f.seek(0)
  992. tb = f.read()
  993. for p in executor._processes.values():
  994. p.terminate()
  995. # This should be safe to call executor.shutdown here as all possible
  996. # deadlocks should have been broken.
  997. executor.shutdown(wait=True)
  998. print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
  999. self.fail(f"Executor deadlock:\n\n{tb}")
  1000. def _check_crash(self, error, func, *args, ignore_stderr=False):
  1001. # test for deadlock caused by crashes in a pool
  1002. self.executor.shutdown(wait=True)
  1003. executor = self.executor_type(
  1004. max_workers=2, mp_context=self.get_context())
  1005. res = executor.submit(func, *args)
  1006. if ignore_stderr:
  1007. cm = support.captured_stderr()
  1008. else:
  1009. cm = contextlib.nullcontext()
  1010. try:
  1011. with self.assertRaises(error):
  1012. with cm:
  1013. res.result(timeout=self.TIMEOUT)
  1014. except futures.TimeoutError:
  1015. # If we did not recover before TIMEOUT seconds,
  1016. # consider that the executor is in a deadlock state
  1017. self._fail_on_deadlock(executor)
  1018. executor.shutdown(wait=True)
  1019. def test_error_at_task_pickle(self):
  1020. # Check problem occurring while pickling a task in
  1021. # the task_handler thread
  1022. self._check_crash(PicklingError, id, ErrorAtPickle())
  1023. def test_exit_at_task_unpickle(self):
  1024. # Check problem occurring while unpickling a task on workers
  1025. self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
  1026. def test_error_at_task_unpickle(self):
  1027. # Check problem occurring while unpickling a task on workers
  1028. self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
  1029. def test_crash_at_task_unpickle(self):
  1030. # Check problem occurring while unpickling a task on workers
  1031. self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
  1032. def test_crash_during_func_exec_on_worker(self):
  1033. # Check problem occurring during func execution on workers
  1034. self._check_crash(BrokenProcessPool, _crash)
  1035. def test_exit_during_func_exec_on_worker(self):
  1036. # Check problem occurring during func execution on workers
  1037. self._check_crash(SystemExit, _exit)
  1038. def test_error_during_func_exec_on_worker(self):
  1039. # Check problem occurring during func execution on workers
  1040. self._check_crash(RuntimeError, _raise_error, RuntimeError)
  1041. def test_crash_during_result_pickle_on_worker(self):
  1042. # Check problem occurring while pickling a task result
  1043. # on workers
  1044. self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
  1045. def test_exit_during_result_pickle_on_worker(self):
  1046. # Check problem occurring while pickling a task result
  1047. # on workers
  1048. self._check_crash(SystemExit, _return_instance, ExitAtPickle)
  1049. def test_error_during_result_pickle_on_worker(self):
  1050. # Check problem occurring while pickling a task result
  1051. # on workers
  1052. self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
  1053. def test_error_during_result_unpickle_in_result_handler(self):
  1054. # Check problem occurring while unpickling a task in
  1055. # the result_handler thread
  1056. self._check_crash(BrokenProcessPool,
  1057. _return_instance, ErrorAtUnpickle,
  1058. ignore_stderr=True)
  1059. def test_exit_during_result_unpickle_in_result_handler(self):
  1060. # Check problem occurring while unpickling a task in
  1061. # the result_handler thread
  1062. self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
  1063. def test_shutdown_deadlock(self):
  1064. # Test that the pool calling shutdown do not cause deadlock
  1065. # if a worker fails after the shutdown call.
  1066. self.executor.shutdown(wait=True)
  1067. with self.executor_type(max_workers=2,
  1068. mp_context=self.get_context()) as executor:
  1069. self.executor = executor # Allow clean up in fail_on_deadlock
  1070. f = executor.submit(_crash, delay=.1)
  1071. executor.shutdown(wait=True)
  1072. with self.assertRaises(BrokenProcessPool):
  1073. f.result()
  1074. def test_shutdown_deadlock_pickle(self):
  1075. # Test that the pool calling shutdown with wait=False does not cause
  1076. # a deadlock if a task fails at pickle after the shutdown call.
  1077. # Reported in bpo-39104.
  1078. self.executor.shutdown(wait=True)
  1079. with self.executor_type(max_workers=2,
  1080. mp_context=self.get_context()) as executor:
  1081. self.executor = executor # Allow clean up in fail_on_deadlock
  1082. # Start the executor and get the executor_manager_thread to collect
  1083. # the threads and avoid dangling thread that should be cleaned up
  1084. # asynchronously.
  1085. executor.submit(id, 42).result()
  1086. executor_manager = executor._executor_manager_thread
  1087. # Submit a task that fails at pickle and shutdown the executor
  1088. # without waiting
  1089. f = executor.submit(id, ErrorAtPickle())
  1090. executor.shutdown(wait=False)
  1091. with self.assertRaises(PicklingError):
  1092. f.result()
  1093. # Make sure the executor is eventually shutdown and do not leave
  1094. # dangling threads
  1095. executor_manager.join()
  1096. create_executor_tests(ExecutorDeadlockTest,
  1097. executor_mixins=(ProcessPoolForkMixin,
  1098. ProcessPoolForkserverMixin,
  1099. ProcessPoolSpawnMixin))
  1100. class FutureTests(BaseTestCase):
  1101. def test_done_callback_with_result(self):
  1102. callback_result = None
  1103. def fn(callback_future):
  1104. nonlocal callback_result
  1105. callback_result = callback_future.result()
  1106. f = Future()
  1107. f.add_done_callback(fn)
  1108. f.set_result(5)
  1109. self.assertEqual(5, callback_result)
  1110. def test_done_callback_with_exception(self):
  1111. callback_exception = None
  1112. def fn(callback_future):
  1113. nonlocal callback_exception
  1114. callback_exception = callback_future.exception()
  1115. f = Future()
  1116. f.add_done_callback(fn)
  1117. f.set_exception(Exception('test'))
  1118. self.assertEqual(('test',), callback_exception.args)
  1119. def test_done_callback_with_cancel(self):
  1120. was_cancelled = None
  1121. def fn(callback_future):
  1122. nonlocal was_cancelled
  1123. was_cancelled = callback_future.cancelled()
  1124. f = Future()
  1125. f.add_done_callback(fn)
  1126. self.assertTrue(f.cancel())
  1127. self.assertTrue(was_cancelled)
  1128. def test_done_callback_raises(self):
  1129. with support.captured_stderr() as stderr:
  1130. raising_was_called = False
  1131. fn_was_called = False
  1132. def raising_fn(callback_future):
  1133. nonlocal raising_was_called
  1134. raising_was_called = True
  1135. raise Exception('doh!')
  1136. def fn(callback_future):
  1137. nonlocal fn_was_called
  1138. fn_was_called = True
  1139. f = Future()
  1140. f.add_done_callback(raising_fn)
  1141. f.add_done_callback(fn)
  1142. f.set_result(5)
  1143. self.assertTrue(raising_was_called)
  1144. self.assertTrue(fn_was_called)
  1145. self.assertIn('Exception: doh!', stderr.getvalue())
  1146. def test_done_callback_already_successful(self):
  1147. callback_result = None
  1148. def fn(callback_future):
  1149. nonlocal callback_result
  1150. callback_result = callback_future.result()
  1151. f = Future()
  1152. f.set_result(5)
  1153. f.add_done_callback(fn)
  1154. self.assertEqual(5, callback_result)
  1155. def test_done_callback_already_failed(self):
  1156. callback_exception = None
  1157. def fn(callback_future):
  1158. nonlocal callback_exception
  1159. callback_exception = callback_future.exception()
  1160. f = Future()
  1161. f.set_exception(Exception('test'))
  1162. f.add_done_callback(fn)
  1163. self.assertEqual(('test',), callback_exception.args)
  1164. def test_done_callback_already_cancelled(self):
  1165. was_cancelled = None
  1166. def fn(callback_future):
  1167. nonlocal was_cancelled
  1168. was_cancelled = callback_future.cancelled()
  1169. f = Future()
  1170. self.assertTrue(f.cancel())
  1171. f.add_done_callback(fn)
  1172. self.assertTrue(was_cancelled)
  1173. def test_done_callback_raises_already_succeeded(self):
  1174. with support.captured_stderr() as stderr:
  1175. def raising_fn(callback_future):
  1176. raise Exception('doh!')
  1177. f = Future()
  1178. # Set the result first to simulate a future that runs instantly,
  1179. # effectively allowing the callback to be run immediately.
  1180. f.set_result(5)
  1181. f.add_done_callback(raising_fn)
  1182. self.assertIn('exception calling callback for', stderr.getvalue())
  1183. self.assertIn('doh!', stderr.getvalue())
  1184. def test_repr(self):
  1185. self.assertRegex(repr(PENDING_FUTURE),
  1186. '<Future at 0x[0-9a-f]+ state=pending>')
  1187. self.assertRegex(repr(RUNNING_FUTURE),
  1188. '<Future at 0x[0-9a-f]+ state=running>')
  1189. self.assertRegex(repr(CANCELLED_FUTURE),
  1190. '<Future at 0x[0-9a-f]+ state=cancelled>')
  1191. self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
  1192. '<Future at 0x[0-9a-f]+ state=cancelled>')
  1193. self.assertRegex(
  1194. repr(EXCEPTION_FUTURE),
  1195. '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
  1196. self.assertRegex(
  1197. repr(SUCCESSFUL_FUTURE),
  1198. '<Future at 0x[0-9a-f]+ state=finished returned int>')
  1199. def test_cancel(self):
  1200. f1 = create_future(state=PENDING)
  1201. f2 = create_future(state=RUNNING)
  1202. f3 = create_future(state=CANCELLED)
  1203. f4 = create_future(state=CANCELLED_AND_NOTIFIED)
  1204. f5 = create_future(state=FINISHED, exception=OSError())
  1205. f6 = create_future(state=FINISHED, result=5)
  1206. self.assertTrue(f1.cancel())
  1207. self.assertEqual(f1._state, CANCELLED)
  1208. self.assertFalse(f2.cancel())
  1209. self.assertEqual(f2._state, RUNNING)
  1210. self.assertTrue(f3.cancel())
  1211. self.assertEqual(f3._state, CANCELLED)
  1212. self.assertTrue(f4.cancel())
  1213. self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
  1214. self.assertFalse(f5.cancel())
  1215. self.assertEqual(f5._state, FINISHED)
  1216. self.assertFalse(f6.cancel())
  1217. self.assertEqual(f6._state, FINISHED)
  1218. def test_cancelled(self):
  1219. self.assertFalse(PENDING_FUTURE.cancelled())
  1220. self.assertFalse(RUNNING_FUTURE.cancelled())
  1221. self.assertTrue(CANCELLED_FUTURE.cancelled())
  1222. self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
  1223. self.assertFalse(EXCEPTION_FUTURE.cancelled())
  1224. self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
  1225. def test_done(self):
  1226. self.assertFalse(PENDING_FUTURE.done())
  1227. self.assertFalse(RUNNING_FUTURE.done())
  1228. self.assertTrue(CANCELLED_FUTURE.done())
  1229. self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
  1230. self.assertTrue(EXCEPTION_FUTURE.done())
  1231. self.assertTrue(SUCCESSFUL_FUTURE.done())
  1232. def test_running(self):
  1233. self.assertFalse(PENDING_FUTURE.running())
  1234. self.assertTrue(RUNNING_FUTURE.running())
  1235. self.assertFalse(CANCELLED_FUTURE.running())
  1236. self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
  1237. self.assertFalse(EXCEPTION_FUTURE.running())
  1238. self.assertFalse(SUCCESSFUL_FUTURE.running())
  1239. def test_result_with_timeout(self):
  1240. self.assertRaises(futures.TimeoutError,
  1241. PENDING_FUTURE.result, timeout=0)
  1242. self.assertRaises(futures.TimeoutError,
  1243. RUNNING_FUTURE.result, timeout=0)
  1244. self.assertRaises(futures.CancelledError,
  1245. CANCELLED_FUTURE.result, timeout=0)
  1246. self.assertRaises(futures.CancelledError,
  1247. CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
  1248. self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
  1249. self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
  1250. def test_result_with_success(self):
  1251. # TODO(brian@sweetapp.com): This test is timing dependent.
  1252. def notification():
  1253. # Wait until the main thread is waiting for the result.
  1254. time.sleep(1)
  1255. f1.set_result(42)
  1256. f1 = create_future(state=PENDING)
  1257. t = threading.Thread(target=notification)
  1258. t.start()
  1259. self.assertEqual(f1.result(timeout=5), 42)
  1260. t.join()
  1261. def test_result_with_cancel(self):
  1262. # TODO(brian@sweetapp.com): This test is timing dependent.
  1263. def notification():
  1264. # Wait until the main thread is waiting for the result.
  1265. time.sleep(1)
  1266. f1.cancel()
  1267. f1 = create_future(state=PENDING)
  1268. t = threading.Thread(target=notification)
  1269. t.start()
  1270. self.assertRaises(futures.CancelledError,
  1271. f1.result, timeout=support.SHORT_TIMEOUT)
  1272. t.join()
  1273. def test_exception_with_timeout(self):
  1274. self.assertRaises(futures.TimeoutError,
  1275. PENDING_FUTURE.exception, timeout=0)
  1276. self.assertRaises(futures.TimeoutError,
  1277. RUNNING_FUTURE.exception, timeout=0)
  1278. self.assertRaises(futures.CancelledError,
  1279. CANCELLED_FUTURE.exception, timeout=0)
  1280. self.assertRaises(futures.CancelledError,
  1281. CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
  1282. self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
  1283. OSError))
  1284. self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
  1285. def test_exception_with_success(self):
  1286. def notification():
  1287. # Wait until the main thread is waiting for the exception.
  1288. time.sleep(1)
  1289. with f1._condition:
  1290. f1._state = FINISHED
  1291. f1._exception = OSError()
  1292. f1._condition.notify_all()
  1293. f1 = create_future(state=PENDING)
  1294. t = threading.Thread(target=notification)
  1295. t.start()
  1296. self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
  1297. t.join()
  1298. def test_multiple_set_result(self):
  1299. f = create_future(state=PENDING)
  1300. f.set_result(1)
  1301. with self.assertRaisesRegex(
  1302. futures.InvalidStateError,
  1303. 'FINISHED: <Future at 0x[0-9a-f]+ '
  1304. 'state=finished returned int>'
  1305. ):
  1306. f.set_result(2)
  1307. self.assertTrue(f.done())
  1308. self.assertEqual(f.result(), 1)
  1309. def test_multiple_set_exception(self):
  1310. f = create_future(state=PENDING)
  1311. e = ValueError()
  1312. f.set_exception(e)
  1313. with self.assertRaisesRegex(
  1314. futures.InvalidStateError,
  1315. 'FINISHED: <Future at 0x[0-9a-f]+ '
  1316. 'state=finished raised ValueError>'
  1317. ):
  1318. f.set_exception(Exception())
  1319. self.assertEqual(f.exception(), e)
  1320. def setUpModule():
  1321. unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
  1322. thread_info = threading_helper.threading_setup()
  1323. unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
  1324. if __name__ == "__main__":
  1325. unittest.main()