| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626 |
- from test import support
- from test.support import import_helper
- from test.support import threading_helper
- # Skip tests if _multiprocessing wasn't built.
- import_helper.import_module('_multiprocessing')
- from test.support import hashlib_helper
- from test.support.script_helper import assert_python_ok
- import contextlib
- import itertools
- import logging
- from logging.handlers import QueueHandler
- import os
- import queue
- import sys
- import threading
- import time
- import unittest
- import weakref
- from pickle import PicklingError
- from concurrent import futures
- from concurrent.futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
- BrokenExecutor)
- from concurrent.futures.process import BrokenProcessPool, _check_system_limits
- import multiprocessing.process
- import multiprocessing.util
- import multiprocessing as mp
- if support.check_sanitizer(address=True, memory=True):
- # bpo-46633: Skip the test because it is too slow when Python is built
- # with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
- raise unittest.SkipTest("test too slow on ASAN/MSAN build")
- def create_future(state=PENDING, exception=None, result=None):
- f = Future()
- f._state = state
- f._exception = exception
- f._result = result
- return f
- PENDING_FUTURE = create_future(state=PENDING)
- RUNNING_FUTURE = create_future(state=RUNNING)
- CANCELLED_FUTURE = create_future(state=CANCELLED)
- CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
- EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
- SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
- INITIALIZER_STATUS = 'uninitialized'
- def mul(x, y):
- return x * y
- def capture(*args, **kwargs):
- return args, kwargs
- def sleep_and_raise(t):
- time.sleep(t)
- raise Exception('this is an exception')
- def sleep_and_print(t, msg):
- time.sleep(t)
- print(msg)
- sys.stdout.flush()
- def init(x):
- global INITIALIZER_STATUS
- INITIALIZER_STATUS = x
- def get_init_status():
- return INITIALIZER_STATUS
- def init_fail(log_queue=None):
- if log_queue is not None:
- logger = logging.getLogger('concurrent.futures')
- logger.addHandler(QueueHandler(log_queue))
- logger.setLevel('CRITICAL')
- logger.propagate = False
- time.sleep(0.1) # let some futures be scheduled
- raise ValueError('error in initializer')
- class MyObject(object):
- def my_method(self):
- pass
- class EventfulGCObj():
- def __init__(self, mgr):
- self.event = mgr.Event()
- def __del__(self):
- self.event.set()
- def make_dummy_object(_):
- return MyObject()
- class BaseTestCase(unittest.TestCase):
- def setUp(self):
- self._thread_key = threading_helper.threading_setup()
- def tearDown(self):
- support.reap_children()
- threading_helper.threading_cleanup(*self._thread_key)
- class ExecutorMixin:
- worker_count = 5
- executor_kwargs = {}
- def setUp(self):
- super().setUp()
- self.t1 = time.monotonic()
- if hasattr(self, "ctx"):
- self.executor = self.executor_type(
- max_workers=self.worker_count,
- mp_context=self.get_context(),
- **self.executor_kwargs)
- else:
- self.executor = self.executor_type(
- max_workers=self.worker_count,
- **self.executor_kwargs)
- def tearDown(self):
- self.executor.shutdown(wait=True)
- self.executor = None
- dt = time.monotonic() - self.t1
- if support.verbose:
- print("%.2fs" % dt, end=' ')
- self.assertLess(dt, 300, "synchronization issue: test lasted too long")
- super().tearDown()
- def get_context(self):
- return mp.get_context(self.ctx)
- class ThreadPoolMixin(ExecutorMixin):
- executor_type = futures.ThreadPoolExecutor
- class ProcessPoolForkMixin(ExecutorMixin):
- executor_type = futures.ProcessPoolExecutor
- ctx = "fork"
- def get_context(self):
- try:
- _check_system_limits()
- except NotImplementedError:
- self.skipTest("ProcessPoolExecutor unavailable on this system")
- if sys.platform == "win32":
- self.skipTest("require unix system")
- return super().get_context()
- class ProcessPoolSpawnMixin(ExecutorMixin):
- executor_type = futures.ProcessPoolExecutor
- ctx = "spawn"
- def get_context(self):
- try:
- _check_system_limits()
- except NotImplementedError:
- self.skipTest("ProcessPoolExecutor unavailable on this system")
- return super().get_context()
- class ProcessPoolForkserverMixin(ExecutorMixin):
- executor_type = futures.ProcessPoolExecutor
- ctx = "forkserver"
- def get_context(self):
- try:
- _check_system_limits()
- except NotImplementedError:
- self.skipTest("ProcessPoolExecutor unavailable on this system")
- if sys.platform == "win32":
- self.skipTest("require unix system")
- return super().get_context()
- def create_executor_tests(mixin, bases=(BaseTestCase,),
- executor_mixins=(ThreadPoolMixin,
- ProcessPoolForkMixin,
- ProcessPoolForkserverMixin,
- ProcessPoolSpawnMixin)):
- def strip_mixin(name):
- if name.endswith(('Mixin', 'Tests')):
- return name[:-5]
- elif name.endswith('Test'):
- return name[:-4]
- else:
- return name
- for exe in executor_mixins:
- name = ("%s%sTest"
- % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
- cls = type(name, (mixin,) + (exe,) + bases, {})
- globals()[name] = cls
- class InitializerMixin(ExecutorMixin):
- worker_count = 2
- def setUp(self):
- global INITIALIZER_STATUS
- INITIALIZER_STATUS = 'uninitialized'
- self.executor_kwargs = dict(initializer=init,
- initargs=('initialized',))
- super().setUp()
- def test_initializer(self):
- futures = [self.executor.submit(get_init_status)
- for _ in range(self.worker_count)]
- for f in futures:
- self.assertEqual(f.result(), 'initialized')
- class FailingInitializerMixin(ExecutorMixin):
- worker_count = 2
- def setUp(self):
- if hasattr(self, "ctx"):
- # Pass a queue to redirect the child's logging output
- self.mp_context = self.get_context()
- self.log_queue = self.mp_context.Queue()
- self.executor_kwargs = dict(initializer=init_fail,
- initargs=(self.log_queue,))
- else:
- # In a thread pool, the child shares our logging setup
- # (see _assert_logged())
- self.mp_context = None
- self.log_queue = None
- self.executor_kwargs = dict(initializer=init_fail)
- super().setUp()
- def test_initializer(self):
- with self._assert_logged('ValueError: error in initializer'):
- try:
- future = self.executor.submit(get_init_status)
- except BrokenExecutor:
- # Perhaps the executor is already broken
- pass
- else:
- with self.assertRaises(BrokenExecutor):
- future.result()
- # At some point, the executor should break
- t1 = time.monotonic()
- while not self.executor._broken:
- if time.monotonic() - t1 > 5:
- self.fail("executor not broken after 5 s.")
- time.sleep(0.01)
- # ... and from this point submit() is guaranteed to fail
- with self.assertRaises(BrokenExecutor):
- self.executor.submit(get_init_status)
- @contextlib.contextmanager
- def _assert_logged(self, msg):
- if self.log_queue is not None:
- yield
- output = []
- try:
- while True:
- output.append(self.log_queue.get_nowait().getMessage())
- except queue.Empty:
- pass
- else:
- with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
- yield
- output = cm.output
- self.assertTrue(any(msg in line for line in output),
- output)
- create_executor_tests(InitializerMixin)
- create_executor_tests(FailingInitializerMixin)
- class ExecutorShutdownTest:
- def test_run_after_shutdown(self):
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.submit,
- pow, 2, 5)
- def test_interpreter_shutdown(self):
- # Test the atexit hook for shutdown of worker threads and processes
- rc, out, err = assert_python_ok('-c', """if 1:
- from concurrent.futures import {executor_type}
- from time import sleep
- from test.test_concurrent_futures import sleep_and_print
- if __name__ == "__main__":
- context = '{context}'
- if context == "":
- t = {executor_type}(5)
- else:
- from multiprocessing import get_context
- context = get_context(context)
- t = {executor_type}(5, mp_context=context)
- t.submit(sleep_and_print, 1.0, "apple")
- """.format(executor_type=self.executor_type.__name__,
- context=getattr(self, "ctx", "")))
- # Errors in atexit hooks don't change the process exit code, check
- # stderr manually.
- self.assertFalse(err)
- self.assertEqual(out.strip(), b"apple")
- def test_submit_after_interpreter_shutdown(self):
- # Test the atexit hook for shutdown of worker threads and processes
- rc, out, err = assert_python_ok('-c', """if 1:
- import atexit
- @atexit.register
- def run_last():
- try:
- t.submit(id, None)
- except RuntimeError:
- print("runtime-error")
- raise
- from concurrent.futures import {executor_type}
- if __name__ == "__main__":
- context = '{context}'
- if not context:
- t = {executor_type}(5)
- else:
- from multiprocessing import get_context
- context = get_context(context)
- t = {executor_type}(5, mp_context=context)
- t.submit(id, 42).result()
- """.format(executor_type=self.executor_type.__name__,
- context=getattr(self, "ctx", "")))
- # Errors in atexit hooks don't change the process exit code, check
- # stderr manually.
- self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
- self.assertEqual(out.strip(), b"runtime-error")
- def test_hang_issue12364(self):
- fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
- self.executor.shutdown()
- for f in fs:
- f.result()
- def test_cancel_futures(self):
- assert self.worker_count <= 5, "test needs few workers"
- fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
- self.executor.shutdown(cancel_futures=True)
- # We can't guarantee the exact number of cancellations, but we can
- # guarantee that *some* were cancelled. With few workers, many of
- # the submitted futures should have been cancelled.
- cancelled = [fut for fut in fs if fut.cancelled()]
- self.assertGreater(len(cancelled), 20)
- # Ensure the other futures were able to finish.
- # Use "not fut.cancelled()" instead of "fut.done()" to include futures
- # that may have been left in a pending state.
- others = [fut for fut in fs if not fut.cancelled()]
- for fut in others:
- self.assertTrue(fut.done(), msg=f"{fut._state=}")
- self.assertIsNone(fut.exception())
- # Similar to the number of cancelled futures, we can't guarantee the
- # exact number that completed. But, we can guarantee that at least
- # one finished.
- self.assertGreater(len(others), 0)
- def test_hang_gh83386(self):
- """shutdown(wait=False) doesn't hang at exit with running futures.
- See https://github.com/python/cpython/issues/83386.
- """
- if self.executor_type == futures.ProcessPoolExecutor:
- raise unittest.SkipTest(
- "Hangs, see https://github.com/python/cpython/issues/83386")
- rc, out, err = assert_python_ok('-c', """if True:
- from concurrent.futures import {executor_type}
- from test.test_concurrent_futures import sleep_and_print
- if __name__ == "__main__":
- if {context!r}: multiprocessing.set_start_method({context!r})
- t = {executor_type}(max_workers=3)
- t.submit(sleep_and_print, 1.0, "apple")
- t.shutdown(wait=False)
- """.format(executor_type=self.executor_type.__name__,
- context=getattr(self, 'ctx', None)))
- self.assertFalse(err)
- self.assertEqual(out.strip(), b"apple")
- class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
- def test_threads_terminate(self):
- def acquire_lock(lock):
- lock.acquire()
- sem = threading.Semaphore(0)
- for i in range(3):
- self.executor.submit(acquire_lock, sem)
- self.assertEqual(len(self.executor._threads), 3)
- for i in range(3):
- sem.release()
- self.executor.shutdown()
- for t in self.executor._threads:
- t.join()
- def test_context_manager_shutdown(self):
- with futures.ThreadPoolExecutor(max_workers=5) as e:
- executor = e
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for t in executor._threads:
- t.join()
- def test_del_shutdown(self):
- executor = futures.ThreadPoolExecutor(max_workers=5)
- res = executor.map(abs, range(-5, 5))
- threads = executor._threads
- del executor
- for t in threads:
- t.join()
- # Make sure the results were all computed before the
- # executor got shutdown.
- assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
- def test_shutdown_no_wait(self):
- # Ensure that the executor cleans up the threads when calling
- # shutdown with wait=False
- executor = futures.ThreadPoolExecutor(max_workers=5)
- res = executor.map(abs, range(-5, 5))
- threads = executor._threads
- executor.shutdown(wait=False)
- for t in threads:
- t.join()
- # Make sure the results were all computed before the
- # executor got shutdown.
- assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
- def test_thread_names_assigned(self):
- executor = futures.ThreadPoolExecutor(
- max_workers=5, thread_name_prefix='SpecialPool')
- executor.map(abs, range(-5, 5))
- threads = executor._threads
- del executor
- support.gc_collect() # For PyPy or other GCs.
- for t in threads:
- self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
- t.join()
- def test_thread_names_default(self):
- executor = futures.ThreadPoolExecutor(max_workers=5)
- executor.map(abs, range(-5, 5))
- threads = executor._threads
- del executor
- support.gc_collect() # For PyPy or other GCs.
- for t in threads:
- # Ensure that our default name is reasonably sane and unique when
- # no thread_name_prefix was supplied.
- self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
- t.join()
- def test_cancel_futures_wait_false(self):
- # Can only be reliably tested for TPE, since PPE often hangs with
- # `wait=False` (even without *cancel_futures*).
- rc, out, err = assert_python_ok('-c', """if True:
- from concurrent.futures import ThreadPoolExecutor
- from test.test_concurrent_futures import sleep_and_print
- if __name__ == "__main__":
- t = ThreadPoolExecutor()
- t.submit(sleep_and_print, .1, "apple")
- t.shutdown(wait=False, cancel_futures=True)
- """)
- # Errors in atexit hooks don't change the process exit code, check
- # stderr manually.
- self.assertFalse(err)
- self.assertEqual(out.strip(), b"apple")
- class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def test_processes_terminate(self):
- def acquire_lock(lock):
- lock.acquire()
- mp_context = self.get_context()
- if mp_context.get_start_method(allow_none=False) == "fork":
- # fork pre-spawns, not on demand.
- expected_num_processes = self.worker_count
- else:
- expected_num_processes = 3
- sem = mp_context.Semaphore(0)
- for _ in range(3):
- self.executor.submit(acquire_lock, sem)
- self.assertEqual(len(self.executor._processes), expected_num_processes)
- for _ in range(3):
- sem.release()
- processes = self.executor._processes
- self.executor.shutdown()
- for p in processes.values():
- p.join()
- def test_context_manager_shutdown(self):
- with futures.ProcessPoolExecutor(
- max_workers=5, mp_context=self.get_context()) as e:
- processes = e._processes
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
- for p in processes.values():
- p.join()
- def test_del_shutdown(self):
- executor = futures.ProcessPoolExecutor(
- max_workers=5, mp_context=self.get_context())
- res = executor.map(abs, range(-5, 5))
- executor_manager_thread = executor._executor_manager_thread
- processes = executor._processes
- call_queue = executor._call_queue
- executor_manager_thread = executor._executor_manager_thread
- del executor
- support.gc_collect() # For PyPy or other GCs.
- # Make sure that all the executor resources were properly cleaned by
- # the shutdown process
- executor_manager_thread.join()
- for p in processes.values():
- p.join()
- call_queue.join_thread()
- # Make sure the results were all computed before the
- # executor got shutdown.
- assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
- def test_shutdown_no_wait(self):
- # Ensure that the executor cleans up the processes when calling
- # shutdown with wait=False
- executor = futures.ProcessPoolExecutor(
- max_workers=5, mp_context=self.get_context())
- res = executor.map(abs, range(-5, 5))
- processes = executor._processes
- call_queue = executor._call_queue
- executor_manager_thread = executor._executor_manager_thread
- executor.shutdown(wait=False)
- # Make sure that all the executor resources were properly cleaned by
- # the shutdown process
- executor_manager_thread.join()
- for p in processes.values():
- p.join()
- call_queue.join_thread()
- # Make sure the results were all computed before the executor got
- # shutdown.
- assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
- create_executor_tests(ProcessPoolShutdownTest,
- executor_mixins=(ProcessPoolForkMixin,
- ProcessPoolForkserverMixin,
- ProcessPoolSpawnMixin))
- class WaitTests:
- def test_20369(self):
- # See https://bugs.python.org/issue20369
- future = self.executor.submit(time.sleep, 1.5)
- done, not_done = futures.wait([future, future],
- return_when=futures.ALL_COMPLETED)
- self.assertEqual({future}, done)
- self.assertEqual(set(), not_done)
- def test_first_completed(self):
- future1 = self.executor.submit(mul, 21, 2)
- future2 = self.executor.submit(time.sleep, 1.5)
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
- self.assertEqual(set([future1]), done)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
- def test_first_completed_some_already_completed(self):
- future1 = self.executor.submit(time.sleep, 1.5)
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
- self.assertEqual(
- set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
- finished)
- self.assertEqual(set([future1]), pending)
- def test_first_exception(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(sleep_and_raise, 1.5)
- future3 = self.executor.submit(time.sleep, 3)
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([future1, future2]), finished)
- self.assertEqual(set([future3]), pending)
- def test_first_exception_some_already_complete(self):
- future1 = self.executor.submit(divmod, 21, 0)
- future2 = self.executor.submit(time.sleep, 1.5)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
- def test_first_exception_one_already_failed(self):
- future1 = self.executor.submit(time.sleep, 2)
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
- self.assertEqual(set([EXCEPTION_FUTURE]), finished)
- self.assertEqual(set([future1]), pending)
- def test_all_completed(self):
- future1 = self.executor.submit(divmod, 2, 0)
- future2 = self.executor.submit(mul, 2, 21)
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- future1,
- future2],
- return_when=futures.ALL_COMPLETED)
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- future1,
- future2]), finished)
- self.assertEqual(set(), pending)
- def test_timeout(self):
- future1 = self.executor.submit(mul, 6, 7)
- future2 = self.executor.submit(time.sleep, 6)
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=5,
- return_when=futures.ALL_COMPLETED)
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEqual(set([future2]), pending)
- class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
- def test_pending_calls_race(self):
- # Issue #14406: multi-threaded race condition when waiting on all
- # futures.
- event = threading.Event()
- def future_func():
- event.wait()
- oldswitchinterval = sys.getswitchinterval()
- sys.setswitchinterval(1e-6)
- try:
- fs = {self.executor.submit(future_func) for i in range(100)}
- event.set()
- futures.wait(fs, return_when=futures.ALL_COMPLETED)
- finally:
- sys.setswitchinterval(oldswitchinterval)
- create_executor_tests(WaitTests,
- executor_mixins=(ProcessPoolForkMixin,
- ProcessPoolForkserverMixin,
- ProcessPoolSpawnMixin))
- class AsCompletedTests:
- # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
- def test_no_timeout(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(mul, 7, 6)
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEqual(set(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
- def test_zero_timeout(self):
- future1 = self.executor.submit(time.sleep, 2)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
- def test_duplicate_futures(self):
- # Issue 20367. Duplicate futures should not raise exceptions or give
- # duplicate responses.
- # Issue #31641: accept arbitrary iterables.
- future1 = self.executor.submit(time.sleep, 2)
- completed = [
- f for f in futures.as_completed(itertools.repeat(future1, 3))
- ]
- self.assertEqual(len(completed), 1)
- def test_free_reference_yielded_future(self):
- # Issue #14406: Generator should not keep references
- # to finished futures.
- futures_list = [Future() for _ in range(8)]
- futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
- futures_list.append(create_future(state=FINISHED, result=42))
- with self.assertRaises(futures.TimeoutError):
- for future in futures.as_completed(futures_list, timeout=0):
- futures_list.remove(future)
- wr = weakref.ref(future)
- del future
- support.gc_collect() # For PyPy or other GCs.
- self.assertIsNone(wr())
- futures_list[0].set_result("test")
- for future in futures.as_completed(futures_list):
- futures_list.remove(future)
- wr = weakref.ref(future)
- del future
- support.gc_collect() # For PyPy or other GCs.
- self.assertIsNone(wr())
- if futures_list:
- futures_list[0].set_result("test")
- def test_correct_timeout_exception_msg(self):
- futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
- RUNNING_FUTURE, SUCCESSFUL_FUTURE]
- with self.assertRaises(futures.TimeoutError) as cm:
- list(futures.as_completed(futures_list, timeout=0))
- self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
- create_executor_tests(AsCompletedTests)
- class ExecutorTest:
- # Executor.shutdown() and context manager usage is tested by
- # ExecutorShutdownTest.
- def test_submit(self):
- future = self.executor.submit(pow, 2, 8)
- self.assertEqual(256, future.result())
- def test_submit_keyword(self):
- future = self.executor.submit(mul, 2, y=8)
- self.assertEqual(16, future.result())
- future = self.executor.submit(capture, 1, self=2, fn=3)
- self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
- with self.assertRaises(TypeError):
- self.executor.submit(fn=capture, arg=1)
- with self.assertRaises(TypeError):
- self.executor.submit(arg=1)
- def test_map(self):
- self.assertEqual(
- list(self.executor.map(pow, range(10), range(10))),
- list(map(pow, range(10), range(10))))
- self.assertEqual(
- list(self.executor.map(pow, range(10), range(10), chunksize=3)),
- list(map(pow, range(10), range(10))))
- def test_map_exception(self):
- i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
- self.assertEqual(i.__next__(), (0, 1))
- self.assertEqual(i.__next__(), (0, 1))
- self.assertRaises(ZeroDivisionError, i.__next__)
- def test_map_timeout(self):
- results = []
- try:
- for i in self.executor.map(time.sleep,
- [0, 0, 6],
- timeout=5):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
- self.assertEqual([None, None], results)
- def test_shutdown_race_issue12456(self):
- # Issue #12456: race condition at shutdown where trying to post a
- # sentinel in the call queue blocks (the queue is full while processes
- # have exited).
- self.executor.map(str, [2] * (self.worker_count + 1))
- self.executor.shutdown()
- @support.cpython_only
- def test_no_stale_references(self):
- # Issue #16284: check that the executors don't unnecessarily hang onto
- # references.
- my_object = MyObject()
- my_object_collected = threading.Event()
- my_object_callback = weakref.ref(
- my_object, lambda obj: my_object_collected.set())
- # Deliberately discarding the future.
- self.executor.submit(my_object.my_method)
- del my_object
- collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
- self.assertTrue(collected,
- "Stale reference not collected within timeout.")
- def test_max_workers_negative(self):
- for number in (0, -1):
- with self.assertRaisesRegex(ValueError,
- "max_workers must be greater "
- "than 0"):
- self.executor_type(max_workers=number)
- def test_free_reference(self):
- # Issue #14406: Result iterator should not keep an internal
- # reference to result objects.
- for obj in self.executor.map(make_dummy_object, range(10)):
- wr = weakref.ref(obj)
- del obj
- support.gc_collect() # For PyPy or other GCs.
- self.assertIsNone(wr())
- class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
- def test_map_submits_without_iteration(self):
- """Tests verifying issue 11777."""
- finished = []
- def record_finished(n):
- finished.append(n)
- self.executor.map(record_finished, range(10))
- self.executor.shutdown(wait=True)
- self.assertCountEqual(finished, range(10))
- def test_default_workers(self):
- executor = self.executor_type()
- expected = min(32, (os.cpu_count() or 1) + 4)
- self.assertEqual(executor._max_workers, expected)
- def test_saturation(self):
- executor = self.executor_type(4)
- def acquire_lock(lock):
- lock.acquire()
- sem = threading.Semaphore(0)
- for i in range(15 * executor._max_workers):
- executor.submit(acquire_lock, sem)
- self.assertEqual(len(executor._threads), executor._max_workers)
- for i in range(15 * executor._max_workers):
- sem.release()
- executor.shutdown(wait=True)
- def test_idle_thread_reuse(self):
- executor = self.executor_type()
- executor.submit(mul, 21, 2).result()
- executor.submit(mul, 6, 7).result()
- executor.submit(mul, 3, 14).result()
- self.assertEqual(len(executor._threads), 1)
- executor.shutdown(wait=True)
- @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
- def test_hang_global_shutdown_lock(self):
- # bpo-45021: _global_shutdown_lock should be reinitialized in the child
- # process, otherwise it will never exit
- def submit(pool):
- pool.submit(submit, pool)
- with futures.ThreadPoolExecutor(1) as pool:
- pool.submit(submit, pool)
- for _ in range(50):
- with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
- workers.submit(tuple)
- def test_executor_map_current_future_cancel(self):
- stop_event = threading.Event()
- log = []
- def log_n_wait(ident):
- log.append(f"{ident=} started")
- try:
- stop_event.wait()
- finally:
- log.append(f"{ident=} stopped")
- with self.executor_type(max_workers=1) as pool:
- # submit work to saturate the pool
- fut = pool.submit(log_n_wait, ident="first")
- try:
- with contextlib.closing(
- pool.map(log_n_wait, ["second", "third"], timeout=0)
- ) as gen:
- with self.assertRaises(TimeoutError):
- next(gen)
- finally:
- stop_event.set()
- fut.result()
- # ident='second' is cancelled as a result of raising a TimeoutError
- # ident='third' is cancelled because it remained in the collection of futures
- self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
- class ProcessPoolExecutorTest(ExecutorTest):
- @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
- def test_max_workers_too_large(self):
- with self.assertRaisesRegex(ValueError,
- "max_workers must be <= 61"):
- futures.ProcessPoolExecutor(max_workers=62)
- def test_killed_child(self):
- # When a child process is abruptly terminated, the whole pool gets
- # "broken".
- futures = [self.executor.submit(time.sleep, 3)]
- # Get one of the processes, and terminate (kill) it
- p = next(iter(self.executor._processes.values()))
- p.terminate()
- for fut in futures:
- self.assertRaises(BrokenProcessPool, fut.result)
- # Submitting other jobs fails as well.
- self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
- def test_map_chunksize(self):
- def bad_map():
- list(self.executor.map(pow, range(40), range(40), chunksize=-1))
- ref = list(map(pow, range(40), range(40)))
- self.assertEqual(
- list(self.executor.map(pow, range(40), range(40), chunksize=6)),
- ref)
- self.assertEqual(
- list(self.executor.map(pow, range(40), range(40), chunksize=50)),
- ref)
- self.assertEqual(
- list(self.executor.map(pow, range(40), range(40), chunksize=40)),
- ref)
- self.assertRaises(ValueError, bad_map)
- @classmethod
- def _test_traceback(cls):
- raise RuntimeError(123) # some comment
- def test_traceback(self):
- # We want ensure that the traceback from the child process is
- # contained in the traceback raised in the main process.
- future = self.executor.submit(self._test_traceback)
- with self.assertRaises(Exception) as cm:
- future.result()
- exc = cm.exception
- self.assertIs(type(exc), RuntimeError)
- self.assertEqual(exc.args, (123,))
- cause = exc.__cause__
- self.assertIs(type(cause), futures.process._RemoteTraceback)
- self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
- with support.captured_stderr() as f1:
- try:
- raise exc
- except RuntimeError:
- sys.excepthook(*sys.exc_info())
- self.assertIn('raise RuntimeError(123) # some comment',
- f1.getvalue())
- @hashlib_helper.requires_hashdigest('md5')
- def test_ressources_gced_in_workers(self):
- # Ensure that argument for a job are correctly gc-ed after the job
- # is finished
- mgr = self.get_context().Manager()
- obj = EventfulGCObj(mgr)
- future = self.executor.submit(id, obj)
- future.result()
- self.assertTrue(obj.event.wait(timeout=1))
- # explicitly destroy the object to ensure that EventfulGCObj.__del__()
- # is called while manager is still running.
- obj = None
- support.gc_collect()
- mgr.shutdown()
- mgr.join()
- def test_saturation(self):
- executor = self.executor
- mp_context = self.get_context()
- sem = mp_context.Semaphore(0)
- job_count = 15 * executor._max_workers
- for _ in range(job_count):
- executor.submit(sem.acquire)
- self.assertEqual(len(executor._processes), executor._max_workers)
- for _ in range(job_count):
- sem.release()
- def test_idle_process_reuse_one(self):
- executor = self.executor
- assert executor._max_workers >= 4
- if self.get_context().get_start_method(allow_none=False) == "fork":
- raise unittest.SkipTest("Incompatible with the fork start method.")
- executor.submit(mul, 21, 2).result()
- executor.submit(mul, 6, 7).result()
- executor.submit(mul, 3, 14).result()
- self.assertEqual(len(executor._processes), 1)
- def test_idle_process_reuse_multiple(self):
- executor = self.executor
- assert executor._max_workers <= 5
- if self.get_context().get_start_method(allow_none=False) == "fork":
- raise unittest.SkipTest("Incompatible with the fork start method.")
- executor.submit(mul, 12, 7).result()
- executor.submit(mul, 33, 25)
- executor.submit(mul, 25, 26).result()
- executor.submit(mul, 18, 29)
- executor.submit(mul, 1, 2).result()
- executor.submit(mul, 0, 9)
- self.assertLessEqual(len(executor._processes), 3)
- executor.shutdown()
- def test_max_tasks_per_child(self):
- context = self.get_context()
- if context.get_start_method(allow_none=False) == "fork":
- with self.assertRaises(ValueError):
- self.executor_type(1, mp_context=context, max_tasks_per_child=3)
- return
- # not using self.executor as we need to control construction.
- # arguably this could go in another class w/o that mixin.
- executor = self.executor_type(
- 1, mp_context=context, max_tasks_per_child=3)
- f1 = executor.submit(os.getpid)
- original_pid = f1.result()
- # The worker pid remains the same as the worker could be reused
- f2 = executor.submit(os.getpid)
- self.assertEqual(f2.result(), original_pid)
- self.assertEqual(len(executor._processes), 1)
- f3 = executor.submit(os.getpid)
- self.assertEqual(f3.result(), original_pid)
- # A new worker is spawned, with a statistically different pid,
- # while the previous was reaped.
- f4 = executor.submit(os.getpid)
- new_pid = f4.result()
- self.assertNotEqual(original_pid, new_pid)
- self.assertEqual(len(executor._processes), 1)
- executor.shutdown()
- def test_max_tasks_per_child_defaults_to_spawn_context(self):
- # not using self.executor as we need to control construction.
- # arguably this could go in another class w/o that mixin.
- executor = self.executor_type(1, max_tasks_per_child=3)
- self.assertEqual(executor._mp_context.get_start_method(), "spawn")
- def test_max_tasks_early_shutdown(self):
- context = self.get_context()
- if context.get_start_method(allow_none=False) == "fork":
- raise unittest.SkipTest("Incompatible with the fork start method.")
- # not using self.executor as we need to control construction.
- # arguably this could go in another class w/o that mixin.
- executor = self.executor_type(
- 3, mp_context=context, max_tasks_per_child=1)
- futures = []
- for i in range(6):
- futures.append(executor.submit(mul, i, i))
- executor.shutdown()
- for i, future in enumerate(futures):
- self.assertEqual(future.result(), mul(i, i))
- create_executor_tests(ProcessPoolExecutorTest,
- executor_mixins=(ProcessPoolForkMixin,
- ProcessPoolForkserverMixin,
- ProcessPoolSpawnMixin))
- def _crash(delay=None):
- """Induces a segfault."""
- if delay:
- time.sleep(delay)
- import faulthandler
- faulthandler.disable()
- faulthandler._sigsegv()
- def _exit():
- """Induces a sys exit with exitcode 1."""
- sys.exit(1)
- def _raise_error(Err):
- """Function that raises an Exception in process."""
- raise Err()
- def _raise_error_ignore_stderr(Err):
- """Function that raises an Exception in process and ignores stderr."""
- import io
- sys.stderr = io.StringIO()
- raise Err()
- def _return_instance(cls):
- """Function that returns a instance of cls."""
- return cls()
- class CrashAtPickle(object):
- """Bad object that triggers a segfault at pickling time."""
- def __reduce__(self):
- _crash()
- class CrashAtUnpickle(object):
- """Bad object that triggers a segfault at unpickling time."""
- def __reduce__(self):
- return _crash, ()
- class ExitAtPickle(object):
- """Bad object that triggers a process exit at pickling time."""
- def __reduce__(self):
- _exit()
- class ExitAtUnpickle(object):
- """Bad object that triggers a process exit at unpickling time."""
- def __reduce__(self):
- return _exit, ()
- class ErrorAtPickle(object):
- """Bad object that triggers an error at pickling time."""
- def __reduce__(self):
- from pickle import PicklingError
- raise PicklingError("Error in pickle")
- class ErrorAtUnpickle(object):
- """Bad object that triggers an error at unpickling time."""
- def __reduce__(self):
- from pickle import UnpicklingError
- return _raise_error_ignore_stderr, (UnpicklingError, )
- class ExecutorDeadlockTest:
- TIMEOUT = support.SHORT_TIMEOUT
- def _fail_on_deadlock(self, executor):
- # If we did not recover before TIMEOUT seconds, consider that the
- # executor is in a deadlock state and forcefully clean all its
- # composants.
- import faulthandler
- from tempfile import TemporaryFile
- with TemporaryFile(mode="w+") as f:
- faulthandler.dump_traceback(file=f)
- f.seek(0)
- tb = f.read()
- for p in executor._processes.values():
- p.terminate()
- # This should be safe to call executor.shutdown here as all possible
- # deadlocks should have been broken.
- executor.shutdown(wait=True)
- print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
- self.fail(f"Executor deadlock:\n\n{tb}")
- def _check_crash(self, error, func, *args, ignore_stderr=False):
- # test for deadlock caused by crashes in a pool
- self.executor.shutdown(wait=True)
- executor = self.executor_type(
- max_workers=2, mp_context=self.get_context())
- res = executor.submit(func, *args)
- if ignore_stderr:
- cm = support.captured_stderr()
- else:
- cm = contextlib.nullcontext()
- try:
- with self.assertRaises(error):
- with cm:
- res.result(timeout=self.TIMEOUT)
- except futures.TimeoutError:
- # If we did not recover before TIMEOUT seconds,
- # consider that the executor is in a deadlock state
- self._fail_on_deadlock(executor)
- executor.shutdown(wait=True)
- def test_error_at_task_pickle(self):
- # Check problem occurring while pickling a task in
- # the task_handler thread
- self._check_crash(PicklingError, id, ErrorAtPickle())
- def test_exit_at_task_unpickle(self):
- # Check problem occurring while unpickling a task on workers
- self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
- def test_error_at_task_unpickle(self):
- # Check problem occurring while unpickling a task on workers
- self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
- def test_crash_at_task_unpickle(self):
- # Check problem occurring while unpickling a task on workers
- self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
- def test_crash_during_func_exec_on_worker(self):
- # Check problem occurring during func execution on workers
- self._check_crash(BrokenProcessPool, _crash)
- def test_exit_during_func_exec_on_worker(self):
- # Check problem occurring during func execution on workers
- self._check_crash(SystemExit, _exit)
- def test_error_during_func_exec_on_worker(self):
- # Check problem occurring during func execution on workers
- self._check_crash(RuntimeError, _raise_error, RuntimeError)
- def test_crash_during_result_pickle_on_worker(self):
- # Check problem occurring while pickling a task result
- # on workers
- self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
- def test_exit_during_result_pickle_on_worker(self):
- # Check problem occurring while pickling a task result
- # on workers
- self._check_crash(SystemExit, _return_instance, ExitAtPickle)
- def test_error_during_result_pickle_on_worker(self):
- # Check problem occurring while pickling a task result
- # on workers
- self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
- def test_error_during_result_unpickle_in_result_handler(self):
- # Check problem occurring while unpickling a task in
- # the result_handler thread
- self._check_crash(BrokenProcessPool,
- _return_instance, ErrorAtUnpickle,
- ignore_stderr=True)
- def test_exit_during_result_unpickle_in_result_handler(self):
- # Check problem occurring while unpickling a task in
- # the result_handler thread
- self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
- def test_shutdown_deadlock(self):
- # Test that the pool calling shutdown do not cause deadlock
- # if a worker fails after the shutdown call.
- self.executor.shutdown(wait=True)
- with self.executor_type(max_workers=2,
- mp_context=self.get_context()) as executor:
- self.executor = executor # Allow clean up in fail_on_deadlock
- f = executor.submit(_crash, delay=.1)
- executor.shutdown(wait=True)
- with self.assertRaises(BrokenProcessPool):
- f.result()
- def test_shutdown_deadlock_pickle(self):
- # Test that the pool calling shutdown with wait=False does not cause
- # a deadlock if a task fails at pickle after the shutdown call.
- # Reported in bpo-39104.
- self.executor.shutdown(wait=True)
- with self.executor_type(max_workers=2,
- mp_context=self.get_context()) as executor:
- self.executor = executor # Allow clean up in fail_on_deadlock
- # Start the executor and get the executor_manager_thread to collect
- # the threads and avoid dangling thread that should be cleaned up
- # asynchronously.
- executor.submit(id, 42).result()
- executor_manager = executor._executor_manager_thread
- # Submit a task that fails at pickle and shutdown the executor
- # without waiting
- f = executor.submit(id, ErrorAtPickle())
- executor.shutdown(wait=False)
- with self.assertRaises(PicklingError):
- f.result()
- # Make sure the executor is eventually shutdown and do not leave
- # dangling threads
- executor_manager.join()
- create_executor_tests(ExecutorDeadlockTest,
- executor_mixins=(ProcessPoolForkMixin,
- ProcessPoolForkserverMixin,
- ProcessPoolSpawnMixin))
- class FutureTests(BaseTestCase):
- def test_done_callback_with_result(self):
- callback_result = None
- def fn(callback_future):
- nonlocal callback_result
- callback_result = callback_future.result()
- f = Future()
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertEqual(5, callback_result)
- def test_done_callback_with_exception(self):
- callback_exception = None
- def fn(callback_future):
- nonlocal callback_exception
- callback_exception = callback_future.exception()
- f = Future()
- f.add_done_callback(fn)
- f.set_exception(Exception('test'))
- self.assertEqual(('test',), callback_exception.args)
- def test_done_callback_with_cancel(self):
- was_cancelled = None
- def fn(callback_future):
- nonlocal was_cancelled
- was_cancelled = callback_future.cancelled()
- f = Future()
- f.add_done_callback(fn)
- self.assertTrue(f.cancel())
- self.assertTrue(was_cancelled)
- def test_done_callback_raises(self):
- with support.captured_stderr() as stderr:
- raising_was_called = False
- fn_was_called = False
- def raising_fn(callback_future):
- nonlocal raising_was_called
- raising_was_called = True
- raise Exception('doh!')
- def fn(callback_future):
- nonlocal fn_was_called
- fn_was_called = True
- f = Future()
- f.add_done_callback(raising_fn)
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertTrue(raising_was_called)
- self.assertTrue(fn_was_called)
- self.assertIn('Exception: doh!', stderr.getvalue())
- def test_done_callback_already_successful(self):
- callback_result = None
- def fn(callback_future):
- nonlocal callback_result
- callback_result = callback_future.result()
- f = Future()
- f.set_result(5)
- f.add_done_callback(fn)
- self.assertEqual(5, callback_result)
- def test_done_callback_already_failed(self):
- callback_exception = None
- def fn(callback_future):
- nonlocal callback_exception
- callback_exception = callback_future.exception()
- f = Future()
- f.set_exception(Exception('test'))
- f.add_done_callback(fn)
- self.assertEqual(('test',), callback_exception.args)
- def test_done_callback_already_cancelled(self):
- was_cancelled = None
- def fn(callback_future):
- nonlocal was_cancelled
- was_cancelled = callback_future.cancelled()
- f = Future()
- self.assertTrue(f.cancel())
- f.add_done_callback(fn)
- self.assertTrue(was_cancelled)
- def test_done_callback_raises_already_succeeded(self):
- with support.captured_stderr() as stderr:
- def raising_fn(callback_future):
- raise Exception('doh!')
- f = Future()
- # Set the result first to simulate a future that runs instantly,
- # effectively allowing the callback to be run immediately.
- f.set_result(5)
- f.add_done_callback(raising_fn)
- self.assertIn('exception calling callback for', stderr.getvalue())
- self.assertIn('doh!', stderr.getvalue())
- def test_repr(self):
- self.assertRegex(repr(PENDING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=pending>')
- self.assertRegex(repr(RUNNING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=running>')
- self.assertRegex(repr(CANCELLED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegex(
- repr(EXCEPTION_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
- self.assertRegex(
- repr(SUCCESSFUL_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished returned int>')
- def test_cancel(self):
- f1 = create_future(state=PENDING)
- f2 = create_future(state=RUNNING)
- f3 = create_future(state=CANCELLED)
- f4 = create_future(state=CANCELLED_AND_NOTIFIED)
- f5 = create_future(state=FINISHED, exception=OSError())
- f6 = create_future(state=FINISHED, result=5)
- self.assertTrue(f1.cancel())
- self.assertEqual(f1._state, CANCELLED)
- self.assertFalse(f2.cancel())
- self.assertEqual(f2._state, RUNNING)
- self.assertTrue(f3.cancel())
- self.assertEqual(f3._state, CANCELLED)
- self.assertTrue(f4.cancel())
- self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
- self.assertFalse(f5.cancel())
- self.assertEqual(f5._state, FINISHED)
- self.assertFalse(f6.cancel())
- self.assertEqual(f6._state, FINISHED)
- def test_cancelled(self):
- self.assertFalse(PENDING_FUTURE.cancelled())
- self.assertFalse(RUNNING_FUTURE.cancelled())
- self.assertTrue(CANCELLED_FUTURE.cancelled())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
- self.assertFalse(EXCEPTION_FUTURE.cancelled())
- self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
- def test_done(self):
- self.assertFalse(PENDING_FUTURE.done())
- self.assertFalse(RUNNING_FUTURE.done())
- self.assertTrue(CANCELLED_FUTURE.done())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
- self.assertTrue(EXCEPTION_FUTURE.done())
- self.assertTrue(SUCCESSFUL_FUTURE.done())
- def test_running(self):
- self.assertFalse(PENDING_FUTURE.running())
- self.assertTrue(RUNNING_FUTURE.running())
- self.assertFalse(CANCELLED_FUTURE.running())
- self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
- self.assertFalse(EXCEPTION_FUTURE.running())
- self.assertFalse(SUCCESSFUL_FUTURE.running())
- def test_result_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.result, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
- self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
- self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
- def test_result_with_success(self):
- # TODO(brian@sweetapp.com): This test is timing dependent.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.set_result(42)
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
- self.assertEqual(f1.result(timeout=5), 42)
- t.join()
- def test_result_with_cancel(self):
- # TODO(brian@sweetapp.com): This test is timing dependent.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.cancel()
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
- self.assertRaises(futures.CancelledError,
- f1.result, timeout=support.SHORT_TIMEOUT)
- t.join()
- def test_exception_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
- self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
- OSError))
- self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
- def test_exception_with_success(self):
- def notification():
- # Wait until the main thread is waiting for the exception.
- time.sleep(1)
- with f1._condition:
- f1._state = FINISHED
- f1._exception = OSError()
- f1._condition.notify_all()
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
- self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
- t.join()
- def test_multiple_set_result(self):
- f = create_future(state=PENDING)
- f.set_result(1)
- with self.assertRaisesRegex(
- futures.InvalidStateError,
- 'FINISHED: <Future at 0x[0-9a-f]+ '
- 'state=finished returned int>'
- ):
- f.set_result(2)
- self.assertTrue(f.done())
- self.assertEqual(f.result(), 1)
- def test_multiple_set_exception(self):
- f = create_future(state=PENDING)
- e = ValueError()
- f.set_exception(e)
- with self.assertRaisesRegex(
- futures.InvalidStateError,
- 'FINISHED: <Future at 0x[0-9a-f]+ '
- 'state=finished raised ValueError>'
- ):
- f.set_exception(Exception())
- self.assertEqual(f.exception(), e)
- def setUpModule():
- unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
- thread_info = threading_helper.threading_setup()
- unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
- if __name__ == "__main__":
- unittest.main()
|