| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677 |
- import inspect
- import types
- import unittest
- import contextlib
- from test.support.import_helper import import_module
- from test.support import gc_collect, requires_working_socket
- asyncio = import_module("asyncio")
- requires_working_socket(module=True)
- _no_default = object()
- class AwaitException(Exception):
- pass
- @types.coroutine
- def awaitable(*, throw=False):
- if throw:
- yield ('throw',)
- else:
- yield ('result',)
- def run_until_complete(coro):
- exc = False
- while True:
- try:
- if exc:
- exc = False
- fut = coro.throw(AwaitException)
- else:
- fut = coro.send(None)
- except StopIteration as ex:
- return ex.args[0]
- if fut == ('throw',):
- exc = True
- def to_list(gen):
- async def iterate():
- res = []
- async for i in gen:
- res.append(i)
- return res
- return run_until_complete(iterate())
- def py_anext(iterator, default=_no_default):
- """Pure-Python implementation of anext() for testing purposes.
- Closely matches the builtin anext() C implementation.
- Can be used to compare the built-in implementation of the inner
- coroutines machinery to C-implementation of __anext__() and send()
- or throw() on the returned generator.
- """
- try:
- __anext__ = type(iterator).__anext__
- except AttributeError:
- raise TypeError(f'{iterator!r} is not an async iterator')
- if default is _no_default:
- return __anext__(iterator)
- async def anext_impl():
- try:
- # The C code is way more low-level than this, as it implements
- # all methods of the iterator protocol. In this implementation
- # we're relying on higher-level coroutine concepts, but that's
- # exactly what we want -- crosstest pure-Python high-level
- # implementation and low-level C anext() iterators.
- return await __anext__(iterator)
- except StopAsyncIteration:
- return default
- return anext_impl()
- class AsyncGenSyntaxTest(unittest.TestCase):
- def test_async_gen_syntax_01(self):
- code = '''async def foo():
- await abc
- yield from 123
- '''
- with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'):
- exec(code, {}, {})
- def test_async_gen_syntax_02(self):
- code = '''async def foo():
- yield from 123
- '''
- with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'):
- exec(code, {}, {})
- def test_async_gen_syntax_03(self):
- code = '''async def foo():
- await abc
- yield
- return 123
- '''
- with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'):
- exec(code, {}, {})
- def test_async_gen_syntax_04(self):
- code = '''async def foo():
- yield
- return 123
- '''
- with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'):
- exec(code, {}, {})
- def test_async_gen_syntax_05(self):
- code = '''async def foo():
- if 0:
- yield
- return 12
- '''
- with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'):
- exec(code, {}, {})
- class AsyncGenTest(unittest.TestCase):
- def compare_generators(self, sync_gen, async_gen):
- def sync_iterate(g):
- res = []
- while True:
- try:
- res.append(g.__next__())
- except StopIteration:
- res.append('STOP')
- break
- except Exception as ex:
- res.append(str(type(ex)))
- return res
- def async_iterate(g):
- res = []
- while True:
- an = g.__anext__()
- try:
- while True:
- try:
- an.__next__()
- except StopIteration as ex:
- if ex.args:
- res.append(ex.args[0])
- break
- else:
- res.append('EMPTY StopIteration')
- break
- except StopAsyncIteration:
- raise
- except Exception as ex:
- res.append(str(type(ex)))
- break
- except StopAsyncIteration:
- res.append('STOP')
- break
- return res
- sync_gen_result = sync_iterate(sync_gen)
- async_gen_result = async_iterate(async_gen)
- self.assertEqual(sync_gen_result, async_gen_result)
- return async_gen_result
- def test_async_gen_iteration_01(self):
- async def gen():
- await awaitable()
- a = yield 123
- self.assertIs(a, None)
- await awaitable()
- yield 456
- await awaitable()
- yield 789
- self.assertEqual(to_list(gen()), [123, 456, 789])
- def test_async_gen_iteration_02(self):
- async def gen():
- await awaitable()
- yield 123
- await awaitable()
- g = gen()
- ai = g.__aiter__()
- an = ai.__anext__()
- self.assertEqual(an.__next__(), ('result',))
- try:
- an.__next__()
- except StopIteration as ex:
- self.assertEqual(ex.args[0], 123)
- else:
- self.fail('StopIteration was not raised')
- an = ai.__anext__()
- self.assertEqual(an.__next__(), ('result',))
- try:
- an.__next__()
- except StopAsyncIteration as ex:
- self.assertFalse(ex.args)
- else:
- self.fail('StopAsyncIteration was not raised')
- def test_async_gen_exception_03(self):
- async def gen():
- await awaitable()
- yield 123
- await awaitable(throw=True)
- yield 456
- with self.assertRaises(AwaitException):
- to_list(gen())
- def test_async_gen_exception_04(self):
- async def gen():
- await awaitable()
- yield 123
- 1 / 0
- g = gen()
- ai = g.__aiter__()
- an = ai.__anext__()
- self.assertEqual(an.__next__(), ('result',))
- try:
- an.__next__()
- except StopIteration as ex:
- self.assertEqual(ex.args[0], 123)
- else:
- self.fail('StopIteration was not raised')
- with self.assertRaises(ZeroDivisionError):
- ai.__anext__().__next__()
- def test_async_gen_exception_05(self):
- async def gen():
- yield 123
- raise StopAsyncIteration
- with self.assertRaisesRegex(RuntimeError,
- 'async generator.*StopAsyncIteration'):
- to_list(gen())
- def test_async_gen_exception_06(self):
- async def gen():
- yield 123
- raise StopIteration
- with self.assertRaisesRegex(RuntimeError,
- 'async generator.*StopIteration'):
- to_list(gen())
- def test_async_gen_exception_07(self):
- def sync_gen():
- try:
- yield 1
- 1 / 0
- finally:
- yield 2
- yield 3
- yield 100
- async def async_gen():
- try:
- yield 1
- 1 / 0
- finally:
- yield 2
- yield 3
- yield 100
- self.compare_generators(sync_gen(), async_gen())
- def test_async_gen_exception_08(self):
- def sync_gen():
- try:
- yield 1
- finally:
- yield 2
- 1 / 0
- yield 3
- yield 100
- async def async_gen():
- try:
- yield 1
- await awaitable()
- finally:
- await awaitable()
- yield 2
- 1 / 0
- yield 3
- yield 100
- self.compare_generators(sync_gen(), async_gen())
- def test_async_gen_exception_09(self):
- def sync_gen():
- try:
- yield 1
- 1 / 0
- finally:
- yield 2
- yield 3
- yield 100
- async def async_gen():
- try:
- await awaitable()
- yield 1
- 1 / 0
- finally:
- yield 2
- await awaitable()
- yield 3
- yield 100
- self.compare_generators(sync_gen(), async_gen())
- def test_async_gen_exception_10(self):
- async def gen():
- yield 123
- with self.assertRaisesRegex(TypeError,
- "non-None value .* async generator"):
- gen().__anext__().send(100)
- def test_async_gen_exception_11(self):
- def sync_gen():
- yield 10
- yield 20
- def sync_gen_wrapper():
- yield 1
- sg = sync_gen()
- sg.send(None)
- try:
- sg.throw(GeneratorExit())
- except GeneratorExit:
- yield 2
- yield 3
- async def async_gen():
- yield 10
- yield 20
- async def async_gen_wrapper():
- yield 1
- asg = async_gen()
- await asg.asend(None)
- try:
- await asg.athrow(GeneratorExit())
- except GeneratorExit:
- yield 2
- yield 3
- self.compare_generators(sync_gen_wrapper(), async_gen_wrapper())
- def test_async_gen_api_01(self):
- async def gen():
- yield 123
- g = gen()
- self.assertEqual(g.__name__, 'gen')
- g.__name__ = '123'
- self.assertEqual(g.__name__, '123')
- self.assertIn('.gen', g.__qualname__)
- g.__qualname__ = '123'
- self.assertEqual(g.__qualname__, '123')
- self.assertIsNone(g.ag_await)
- self.assertIsInstance(g.ag_frame, types.FrameType)
- self.assertFalse(g.ag_running)
- self.assertIsInstance(g.ag_code, types.CodeType)
- self.assertTrue(inspect.isawaitable(g.aclose()))
- class AsyncGenAsyncioTest(unittest.TestCase):
- def setUp(self):
- self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
- def tearDown(self):
- self.loop.close()
- self.loop = None
- asyncio.set_event_loop_policy(None)
- def check_async_iterator_anext(self, ait_class):
- with self.subTest(anext="pure-Python"):
- self._check_async_iterator_anext(ait_class, py_anext)
- with self.subTest(anext="builtin"):
- self._check_async_iterator_anext(ait_class, anext)
- def _check_async_iterator_anext(self, ait_class, anext):
- g = ait_class()
- async def consume():
- results = []
- results.append(await anext(g))
- results.append(await anext(g))
- results.append(await anext(g, 'buckle my shoe'))
- return results
- res = self.loop.run_until_complete(consume())
- self.assertEqual(res, [1, 2, 'buckle my shoe'])
- with self.assertRaises(StopAsyncIteration):
- self.loop.run_until_complete(consume())
- async def test_2():
- g1 = ait_class()
- self.assertEqual(await anext(g1), 1)
- self.assertEqual(await anext(g1), 2)
- with self.assertRaises(StopAsyncIteration):
- await anext(g1)
- with self.assertRaises(StopAsyncIteration):
- await anext(g1)
- g2 = ait_class()
- self.assertEqual(await anext(g2, "default"), 1)
- self.assertEqual(await anext(g2, "default"), 2)
- self.assertEqual(await anext(g2, "default"), "default")
- self.assertEqual(await anext(g2, "default"), "default")
- return "completed"
- result = self.loop.run_until_complete(test_2())
- self.assertEqual(result, "completed")
- def test_send():
- p = ait_class()
- obj = anext(p, "completed")
- with self.assertRaises(StopIteration):
- with contextlib.closing(obj.__await__()) as g:
- g.send(None)
- test_send()
- async def test_throw():
- p = ait_class()
- obj = anext(p, "completed")
- self.assertRaises(SyntaxError, obj.throw, SyntaxError)
- return "completed"
- result = self.loop.run_until_complete(test_throw())
- self.assertEqual(result, "completed")
- def test_async_generator_anext(self):
- async def agen():
- yield 1
- yield 2
- self.check_async_iterator_anext(agen)
- def test_python_async_iterator_anext(self):
- class MyAsyncIter:
- """Asynchronously yield 1, then 2."""
- def __init__(self):
- self.yielded = 0
- def __aiter__(self):
- return self
- async def __anext__(self):
- if self.yielded >= 2:
- raise StopAsyncIteration()
- else:
- self.yielded += 1
- return self.yielded
- self.check_async_iterator_anext(MyAsyncIter)
- def test_python_async_iterator_types_coroutine_anext(self):
- import types
- class MyAsyncIterWithTypesCoro:
- """Asynchronously yield 1, then 2."""
- def __init__(self):
- self.yielded = 0
- def __aiter__(self):
- return self
- @types.coroutine
- def __anext__(self):
- if False:
- yield "this is a generator-based coroutine"
- if self.yielded >= 2:
- raise StopAsyncIteration()
- else:
- self.yielded += 1
- return self.yielded
- self.check_async_iterator_anext(MyAsyncIterWithTypesCoro)
- def test_async_gen_aiter(self):
- async def gen():
- yield 1
- yield 2
- g = gen()
- async def consume():
- return [i async for i in aiter(g)]
- res = self.loop.run_until_complete(consume())
- self.assertEqual(res, [1, 2])
- def test_async_gen_aiter_class(self):
- results = []
- class Gen:
- async def __aiter__(self):
- yield 1
- yield 2
- g = Gen()
- async def consume():
- ait = aiter(g)
- while True:
- try:
- results.append(await anext(ait))
- except StopAsyncIteration:
- break
- self.loop.run_until_complete(consume())
- self.assertEqual(results, [1, 2])
- def test_aiter_idempotent(self):
- async def gen():
- yield 1
- applied_once = aiter(gen())
- applied_twice = aiter(applied_once)
- self.assertIs(applied_once, applied_twice)
- def test_anext_bad_args(self):
- async def gen():
- yield 1
- async def call_with_too_few_args():
- await anext()
- async def call_with_too_many_args():
- await anext(gen(), 1, 3)
- async def call_with_wrong_type_args():
- await anext(1, gen())
- async def call_with_kwarg():
- await anext(aiterator=gen())
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_too_few_args())
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_too_many_args())
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_wrong_type_args())
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_kwarg())
- def test_anext_bad_await(self):
- async def bad_awaitable():
- class BadAwaitable:
- def __await__(self):
- return 42
- class MyAsyncIter:
- def __aiter__(self):
- return self
- def __anext__(self):
- return BadAwaitable()
- regex = r"__await__.*iterator"
- awaitable = anext(MyAsyncIter(), "default")
- with self.assertRaisesRegex(TypeError, regex):
- await awaitable
- awaitable = anext(MyAsyncIter())
- with self.assertRaisesRegex(TypeError, regex):
- await awaitable
- return "completed"
- result = self.loop.run_until_complete(bad_awaitable())
- self.assertEqual(result, "completed")
- async def check_anext_returning_iterator(self, aiter_class):
- awaitable = anext(aiter_class(), "default")
- with self.assertRaises(TypeError):
- await awaitable
- awaitable = anext(aiter_class())
- with self.assertRaises(TypeError):
- await awaitable
- return "completed"
- def test_anext_return_iterator(self):
- class WithIterAnext:
- def __aiter__(self):
- return self
- def __anext__(self):
- return iter("abc")
- result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithIterAnext))
- self.assertEqual(result, "completed")
- def test_anext_return_generator(self):
- class WithGenAnext:
- def __aiter__(self):
- return self
- def __anext__(self):
- yield
- result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithGenAnext))
- self.assertEqual(result, "completed")
- def test_anext_await_raises(self):
- class RaisingAwaitable:
- def __await__(self):
- raise ZeroDivisionError()
- yield
- class WithRaisingAwaitableAnext:
- def __aiter__(self):
- return self
- def __anext__(self):
- return RaisingAwaitable()
- async def do_test():
- awaitable = anext(WithRaisingAwaitableAnext())
- with self.assertRaises(ZeroDivisionError):
- await awaitable
- awaitable = anext(WithRaisingAwaitableAnext(), "default")
- with self.assertRaises(ZeroDivisionError):
- await awaitable
- return "completed"
- result = self.loop.run_until_complete(do_test())
- self.assertEqual(result, "completed")
- def test_anext_iter(self):
- @types.coroutine
- def _async_yield(v):
- return (yield v)
- class MyError(Exception):
- pass
- async def agenfn():
- try:
- await _async_yield(1)
- except MyError:
- await _async_yield(2)
- return
- yield
- def test1(anext):
- agen = agenfn()
- with contextlib.closing(anext(agen, "default").__await__()) as g:
- self.assertEqual(g.send(None), 1)
- self.assertEqual(g.throw(MyError, MyError(), None), 2)
- try:
- g.send(None)
- except StopIteration as e:
- err = e
- else:
- self.fail('StopIteration was not raised')
- self.assertEqual(err.value, "default")
- def test2(anext):
- agen = agenfn()
- with contextlib.closing(anext(agen, "default").__await__()) as g:
- self.assertEqual(g.send(None), 1)
- self.assertEqual(g.throw(MyError, MyError(), None), 2)
- with self.assertRaises(MyError):
- g.throw(MyError, MyError(), None)
- def test3(anext):
- agen = agenfn()
- with contextlib.closing(anext(agen, "default").__await__()) as g:
- self.assertEqual(g.send(None), 1)
- g.close()
- with self.assertRaisesRegex(RuntimeError, 'cannot reuse'):
- self.assertEqual(g.send(None), 1)
- def test4(anext):
- @types.coroutine
- def _async_yield(v):
- yield v * 10
- return (yield (v * 10 + 1))
- async def agenfn():
- try:
- await _async_yield(1)
- except MyError:
- await _async_yield(2)
- return
- yield
- agen = agenfn()
- with contextlib.closing(anext(agen, "default").__await__()) as g:
- self.assertEqual(g.send(None), 10)
- self.assertEqual(g.throw(MyError, MyError(), None), 20)
- with self.assertRaisesRegex(MyError, 'val'):
- g.throw(MyError, MyError('val'), None)
- def test5(anext):
- @types.coroutine
- def _async_yield(v):
- yield v * 10
- return (yield (v * 10 + 1))
- async def agenfn():
- try:
- await _async_yield(1)
- except MyError:
- return
- yield 'aaa'
- agen = agenfn()
- with contextlib.closing(anext(agen, "default").__await__()) as g:
- self.assertEqual(g.send(None), 10)
- with self.assertRaisesRegex(StopIteration, 'default'):
- g.throw(MyError, MyError(), None)
- def test6(anext):
- @types.coroutine
- def _async_yield(v):
- yield v * 10
- return (yield (v * 10 + 1))
- async def agenfn():
- await _async_yield(1)
- yield 'aaa'
- agen = agenfn()
- with contextlib.closing(anext(agen, "default").__await__()) as g:
- with self.assertRaises(MyError):
- g.throw(MyError, MyError(), None)
- def run_test(test):
- with self.subTest('pure-Python anext()'):
- test(py_anext)
- with self.subTest('builtin anext()'):
- test(anext)
- run_test(test1)
- run_test(test2)
- run_test(test3)
- run_test(test4)
- run_test(test5)
- run_test(test6)
- def test_aiter_bad_args(self):
- async def gen():
- yield 1
- async def call_with_too_few_args():
- await aiter()
- async def call_with_too_many_args():
- await aiter(gen(), 1)
- async def call_with_wrong_type_arg():
- await aiter(1)
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_too_few_args())
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_too_many_args())
- with self.assertRaises(TypeError):
- self.loop.run_until_complete(call_with_wrong_type_arg())
- async def to_list(self, gen):
- res = []
- async for i in gen:
- res.append(i)
- return res
- def test_async_gen_asyncio_01(self):
- async def gen():
- yield 1
- await asyncio.sleep(0.01)
- yield 2
- await asyncio.sleep(0.01)
- return
- yield 3
- res = self.loop.run_until_complete(self.to_list(gen()))
- self.assertEqual(res, [1, 2])
- def test_async_gen_asyncio_02(self):
- async def gen():
- yield 1
- await asyncio.sleep(0.01)
- yield 2
- 1 / 0
- yield 3
- with self.assertRaises(ZeroDivisionError):
- self.loop.run_until_complete(self.to_list(gen()))
- def test_async_gen_asyncio_03(self):
- loop = self.loop
- class Gen:
- async def __aiter__(self):
- yield 1
- await asyncio.sleep(0.01)
- yield 2
- res = loop.run_until_complete(self.to_list(Gen()))
- self.assertEqual(res, [1, 2])
- def test_async_gen_asyncio_anext_04(self):
- async def foo():
- yield 1
- await asyncio.sleep(0.01)
- try:
- yield 2
- yield 3
- except ZeroDivisionError:
- yield 1000
- await asyncio.sleep(0.01)
- yield 4
- async def run1():
- it = foo().__aiter__()
- self.assertEqual(await it.__anext__(), 1)
- self.assertEqual(await it.__anext__(), 2)
- self.assertEqual(await it.__anext__(), 3)
- self.assertEqual(await it.__anext__(), 4)
- with self.assertRaises(StopAsyncIteration):
- await it.__anext__()
- with self.assertRaises(StopAsyncIteration):
- await it.__anext__()
- async def run2():
- it = foo().__aiter__()
- self.assertEqual(await it.__anext__(), 1)
- self.assertEqual(await it.__anext__(), 2)
- try:
- it.__anext__().throw(ZeroDivisionError)
- except StopIteration as ex:
- self.assertEqual(ex.args[0], 1000)
- else:
- self.fail('StopIteration was not raised')
- self.assertEqual(await it.__anext__(), 4)
- with self.assertRaises(StopAsyncIteration):
- await it.__anext__()
- self.loop.run_until_complete(run1())
- self.loop.run_until_complete(run2())
- def test_async_gen_asyncio_anext_05(self):
- async def foo():
- v = yield 1
- v = yield v
- yield v * 100
- async def run():
- it = foo().__aiter__()
- try:
- it.__anext__().send(None)
- except StopIteration as ex:
- self.assertEqual(ex.args[0], 1)
- else:
- self.fail('StopIteration was not raised')
- try:
- it.__anext__().send(10)
- except StopIteration as ex:
- self.assertEqual(ex.args[0], 10)
- else:
- self.fail('StopIteration was not raised')
- try:
- it.__anext__().send(12)
- except StopIteration as ex:
- self.assertEqual(ex.args[0], 1200)
- else:
- self.fail('StopIteration was not raised')
- with self.assertRaises(StopAsyncIteration):
- await it.__anext__()
- self.loop.run_until_complete(run())
- def test_async_gen_asyncio_anext_06(self):
- DONE = 0
- # test synchronous generators
- def foo():
- try:
- yield
- except:
- pass
- g = foo()
- g.send(None)
- with self.assertRaises(StopIteration):
- g.send(None)
- # now with asynchronous generators
- async def gen():
- nonlocal DONE
- try:
- yield
- except:
- pass
- DONE = 1
- async def run():
- nonlocal DONE
- g = gen()
- await g.asend(None)
- with self.assertRaises(StopAsyncIteration):
- await g.asend(None)
- DONE += 10
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 11)
- def test_async_gen_asyncio_anext_tuple(self):
- async def foo():
- try:
- yield (1,)
- except ZeroDivisionError:
- yield (2,)
- async def run():
- it = foo().__aiter__()
- self.assertEqual(await it.__anext__(), (1,))
- with self.assertRaises(StopIteration) as cm:
- it.__anext__().throw(ZeroDivisionError)
- self.assertEqual(cm.exception.args[0], (2,))
- with self.assertRaises(StopAsyncIteration):
- await it.__anext__()
- self.loop.run_until_complete(run())
- def test_async_gen_asyncio_anext_stopiteration(self):
- async def foo():
- try:
- yield StopIteration(1)
- except ZeroDivisionError:
- yield StopIteration(3)
- async def run():
- it = foo().__aiter__()
- v = await it.__anext__()
- self.assertIsInstance(v, StopIteration)
- self.assertEqual(v.value, 1)
- with self.assertRaises(StopIteration) as cm:
- it.__anext__().throw(ZeroDivisionError)
- v = cm.exception.args[0]
- self.assertIsInstance(v, StopIteration)
- self.assertEqual(v.value, 3)
- with self.assertRaises(StopAsyncIteration):
- await it.__anext__()
- self.loop.run_until_complete(run())
- def test_async_gen_asyncio_aclose_06(self):
- async def foo():
- try:
- yield 1
- 1 / 0
- finally:
- await asyncio.sleep(0.01)
- yield 12
- async def run():
- gen = foo()
- it = gen.__aiter__()
- await it.__anext__()
- await gen.aclose()
- with self.assertRaisesRegex(
- RuntimeError,
- "async generator ignored GeneratorExit"):
- self.loop.run_until_complete(run())
- def test_async_gen_asyncio_aclose_07(self):
- DONE = 0
- async def foo():
- nonlocal DONE
- try:
- yield 1
- 1 / 0
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE += 1
- DONE += 1000
- async def run():
- gen = foo()
- it = gen.__aiter__()
- await it.__anext__()
- await gen.aclose()
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_aclose_08(self):
- DONE = 0
- fut = asyncio.Future(loop=self.loop)
- async def foo():
- nonlocal DONE
- try:
- yield 1
- await fut
- DONE += 1000
- yield 2
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE += 1
- DONE += 1000
- async def run():
- gen = foo()
- it = gen.__aiter__()
- self.assertEqual(await it.__anext__(), 1)
- await gen.aclose()
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- # Silence ResourceWarnings
- fut.cancel()
- self.loop.run_until_complete(asyncio.sleep(0.01))
- def test_async_gen_asyncio_gc_aclose_09(self):
- DONE = 0
- async def gen():
- nonlocal DONE
- try:
- while True:
- yield 1
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE = 1
- async def run():
- g = gen()
- await g.__anext__()
- await g.__anext__()
- del g
- gc_collect() # For PyPy or other GCs.
- await asyncio.sleep(0.1)
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_aclose_10(self):
- DONE = 0
- # test synchronous generators
- def foo():
- try:
- yield
- except:
- pass
- g = foo()
- g.send(None)
- g.close()
- # now with asynchronous generators
- async def gen():
- nonlocal DONE
- try:
- yield
- except:
- pass
- DONE = 1
- async def run():
- nonlocal DONE
- g = gen()
- await g.asend(None)
- await g.aclose()
- DONE += 10
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 11)
- def test_async_gen_asyncio_aclose_11(self):
- DONE = 0
- # test synchronous generators
- def foo():
- try:
- yield
- except:
- pass
- yield
- g = foo()
- g.send(None)
- with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'):
- g.close()
- # now with asynchronous generators
- async def gen():
- nonlocal DONE
- try:
- yield
- except:
- pass
- yield
- DONE += 1
- async def run():
- nonlocal DONE
- g = gen()
- await g.asend(None)
- with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'):
- await g.aclose()
- DONE += 10
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 10)
- def test_async_gen_asyncio_aclose_12(self):
- DONE = 0
- async def target():
- await asyncio.sleep(0.01)
- 1 / 0
- async def foo():
- nonlocal DONE
- task = asyncio.create_task(target())
- try:
- yield 1
- finally:
- try:
- await task
- except ZeroDivisionError:
- DONE = 1
- async def run():
- gen = foo()
- it = gen.__aiter__()
- await it.__anext__()
- await gen.aclose()
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_asend_01(self):
- DONE = 0
- # Sanity check:
- def sgen():
- v = yield 1
- yield v * 2
- sg = sgen()
- v = sg.send(None)
- self.assertEqual(v, 1)
- v = sg.send(100)
- self.assertEqual(v, 200)
- async def gen():
- nonlocal DONE
- try:
- await asyncio.sleep(0.01)
- v = yield 1
- await asyncio.sleep(0.01)
- yield v * 2
- await asyncio.sleep(0.01)
- return
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE = 1
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- v = await g.asend(100)
- self.assertEqual(v, 200)
- with self.assertRaises(StopAsyncIteration):
- await g.asend(None)
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_asend_02(self):
- DONE = 0
- async def sleep_n_crash(delay):
- await asyncio.sleep(delay)
- 1 / 0
- async def gen():
- nonlocal DONE
- try:
- await asyncio.sleep(0.01)
- v = yield 1
- await sleep_n_crash(0.01)
- DONE += 1000
- yield v * 2
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE = 1
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- await g.asend(100)
- with self.assertRaises(ZeroDivisionError):
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_asend_03(self):
- DONE = 0
- async def sleep_n_crash(delay):
- fut = asyncio.ensure_future(asyncio.sleep(delay),
- loop=self.loop)
- self.loop.call_later(delay / 2, lambda: fut.cancel())
- return await fut
- async def gen():
- nonlocal DONE
- try:
- await asyncio.sleep(0.01)
- v = yield 1
- await sleep_n_crash(0.01)
- DONE += 1000
- yield v * 2
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE = 1
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- await g.asend(100)
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_athrow_01(self):
- DONE = 0
- class FooEr(Exception):
- pass
- # Sanity check:
- def sgen():
- try:
- v = yield 1
- except FooEr:
- v = 1000
- yield v * 2
- sg = sgen()
- v = sg.send(None)
- self.assertEqual(v, 1)
- v = sg.throw(FooEr)
- self.assertEqual(v, 2000)
- with self.assertRaises(StopIteration):
- sg.send(None)
- async def gen():
- nonlocal DONE
- try:
- await asyncio.sleep(0.01)
- try:
- v = yield 1
- except FooEr:
- v = 1000
- await asyncio.sleep(0.01)
- yield v * 2
- await asyncio.sleep(0.01)
- # return
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE = 1
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- v = await g.athrow(FooEr)
- self.assertEqual(v, 2000)
- with self.assertRaises(StopAsyncIteration):
- await g.asend(None)
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_athrow_02(self):
- DONE = 0
- class FooEr(Exception):
- pass
- async def sleep_n_crash(delay):
- fut = asyncio.ensure_future(asyncio.sleep(delay),
- loop=self.loop)
- self.loop.call_later(delay / 2, lambda: fut.cancel())
- return await fut
- async def gen():
- nonlocal DONE
- try:
- await asyncio.sleep(0.01)
- try:
- v = yield 1
- except FooEr:
- await sleep_n_crash(0.01)
- yield v * 2
- await asyncio.sleep(0.01)
- # return
- finally:
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.01)
- DONE = 1
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- try:
- await g.athrow(FooEr)
- except asyncio.CancelledError:
- self.assertEqual(DONE, 1)
- raise
- else:
- self.fail('CancelledError was not raised')
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 1)
- def test_async_gen_asyncio_athrow_03(self):
- DONE = 0
- # test synchronous generators
- def foo():
- try:
- yield
- except:
- pass
- g = foo()
- g.send(None)
- with self.assertRaises(StopIteration):
- g.throw(ValueError)
- # now with asynchronous generators
- async def gen():
- nonlocal DONE
- try:
- yield
- except:
- pass
- DONE = 1
- async def run():
- nonlocal DONE
- g = gen()
- await g.asend(None)
- with self.assertRaises(StopAsyncIteration):
- await g.athrow(ValueError)
- DONE += 10
- self.loop.run_until_complete(run())
- self.assertEqual(DONE, 11)
- def test_async_gen_asyncio_athrow_tuple(self):
- async def gen():
- try:
- yield 1
- except ZeroDivisionError:
- yield (2,)
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- v = await g.athrow(ZeroDivisionError)
- self.assertEqual(v, (2,))
- with self.assertRaises(StopAsyncIteration):
- await g.asend(None)
- self.loop.run_until_complete(run())
- def test_async_gen_asyncio_athrow_stopiteration(self):
- async def gen():
- try:
- yield 1
- except ZeroDivisionError:
- yield StopIteration(2)
- async def run():
- g = gen()
- v = await g.asend(None)
- self.assertEqual(v, 1)
- v = await g.athrow(ZeroDivisionError)
- self.assertIsInstance(v, StopIteration)
- self.assertEqual(v.value, 2)
- with self.assertRaises(StopAsyncIteration):
- await g.asend(None)
- self.loop.run_until_complete(run())
- def test_async_gen_asyncio_shutdown_01(self):
- finalized = 0
- async def waiter(timeout):
- nonlocal finalized
- try:
- await asyncio.sleep(timeout)
- yield 1
- finally:
- await asyncio.sleep(0)
- finalized += 1
- async def wait():
- async for _ in waiter(1):
- pass
- t1 = self.loop.create_task(wait())
- t2 = self.loop.create_task(wait())
- self.loop.run_until_complete(asyncio.sleep(0.1))
- # Silence warnings
- t1.cancel()
- t2.cancel()
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(t1)
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(t2)
- self.loop.run_until_complete(self.loop.shutdown_asyncgens())
- self.assertEqual(finalized, 2)
- def test_async_gen_asyncio_shutdown_02(self):
- messages = []
- def exception_handler(loop, context):
- messages.append(context)
- async def async_iterate():
- yield 1
- yield 2
- it = async_iterate()
- async def main():
- loop = asyncio.get_running_loop()
- loop.set_exception_handler(exception_handler)
- async for i in it:
- break
- asyncio.run(main())
- self.assertEqual(messages, [])
- def test_async_gen_asyncio_shutdown_exception_01(self):
- messages = []
- def exception_handler(loop, context):
- messages.append(context)
- async def async_iterate():
- try:
- yield 1
- yield 2
- finally:
- 1/0
- it = async_iterate()
- async def main():
- loop = asyncio.get_running_loop()
- loop.set_exception_handler(exception_handler)
- async for i in it:
- break
- asyncio.run(main())
- message, = messages
- self.assertEqual(message['asyncgen'], it)
- self.assertIsInstance(message['exception'], ZeroDivisionError)
- self.assertIn('an error occurred during closing of asynchronous generator',
- message['message'])
- def test_async_gen_asyncio_shutdown_exception_02(self):
- messages = []
- def exception_handler(loop, context):
- messages.append(context)
- async def async_iterate():
- try:
- yield 1
- yield 2
- finally:
- 1/0
- async def main():
- loop = asyncio.get_running_loop()
- loop.set_exception_handler(exception_handler)
- async for i in async_iterate():
- break
- gc_collect()
- asyncio.run(main())
- message, = messages
- self.assertIsInstance(message['exception'], ZeroDivisionError)
- self.assertIn('unhandled exception during asyncio.run() shutdown',
- message['message'])
- def test_async_gen_expression_01(self):
- async def arange(n):
- for i in range(n):
- await asyncio.sleep(0.01)
- yield i
- def make_arange(n):
- # This syntax is legal starting with Python 3.7
- return (i * 2 async for i in arange(n))
- async def run():
- return [i async for i in make_arange(10)]
- res = self.loop.run_until_complete(run())
- self.assertEqual(res, [i * 2 for i in range(10)])
- def test_async_gen_expression_02(self):
- async def wrap(n):
- await asyncio.sleep(0.01)
- return n
- def make_arange(n):
- # This syntax is legal starting with Python 3.7
- return (i * 2 for i in range(n) if await wrap(i))
- async def run():
- return [i async for i in make_arange(10)]
- res = self.loop.run_until_complete(run())
- self.assertEqual(res, [i * 2 for i in range(1, 10)])
- def test_asyncgen_nonstarted_hooks_are_cancellable(self):
- # See https://bugs.python.org/issue38013
- messages = []
- def exception_handler(loop, context):
- messages.append(context)
- async def async_iterate():
- yield 1
- yield 2
- async def main():
- loop = asyncio.get_running_loop()
- loop.set_exception_handler(exception_handler)
- async for i in async_iterate():
- break
- asyncio.run(main())
- self.assertEqual([], messages)
- def test_async_gen_await_same_anext_coro_twice(self):
- async def async_iterate():
- yield 1
- yield 2
- async def run():
- it = async_iterate()
- nxt = it.__anext__()
- await nxt
- with self.assertRaisesRegex(
- RuntimeError,
- r"cannot reuse already awaited __anext__\(\)/asend\(\)"
- ):
- await nxt
- await it.aclose() # prevent unfinished iterator warning
- self.loop.run_until_complete(run())
- def test_async_gen_await_same_aclose_coro_twice(self):
- async def async_iterate():
- yield 1
- yield 2
- async def run():
- it = async_iterate()
- nxt = it.aclose()
- await nxt
- with self.assertRaisesRegex(
- RuntimeError,
- r"cannot reuse already awaited aclose\(\)/athrow\(\)"
- ):
- await nxt
- self.loop.run_until_complete(run())
- def test_async_gen_aclose_twice_with_different_coros(self):
- # Regression test for https://bugs.python.org/issue39606
- async def async_iterate():
- yield 1
- yield 2
- async def run():
- it = async_iterate()
- await it.aclose()
- await it.aclose()
- self.loop.run_until_complete(run())
- def test_async_gen_aclose_after_exhaustion(self):
- # Regression test for https://bugs.python.org/issue39606
- async def async_iterate():
- yield 1
- yield 2
- async def run():
- it = async_iterate()
- async for _ in it:
- pass
- await it.aclose()
- self.loop.run_until_complete(run())
- def test_async_gen_aclose_compatible_with_get_stack(self):
- async def async_generator():
- yield object()
- async def run():
- ag = async_generator()
- asyncio.create_task(ag.aclose())
- tasks = asyncio.all_tasks()
- for task in tasks:
- # No AttributeError raised
- task.get_stack()
- self.loop.run_until_complete(run())
- if __name__ == "__main__":
- unittest.main()
|