| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054 |
- # -*- coding: utf-8 -*-
- """
- Test suite for PEP 380 implementation
- adapted from original tests written by Greg Ewing
- see <http://www.cosc.canterbury.ac.nz/greg.ewing/python/yield-from/YieldFrom-Python3.1.2-rev5.zip>
- """
- import unittest
- import inspect
- from test.support import captured_stderr, disable_gc, gc_collect
- from test import support
- class TestPEP380Operation(unittest.TestCase):
- """
- Test semantics.
- """
- def test_delegation_of_initial_next_to_subgenerator(self):
- """
- Test delegation of initial next() call to subgenerator
- """
- trace = []
- def g1():
- trace.append("Starting g1")
- yield from g2()
- trace.append("Finishing g1")
- def g2():
- trace.append("Starting g2")
- yield 42
- trace.append("Finishing g2")
- for x in g1():
- trace.append("Yielded %s" % (x,))
- self.assertEqual(trace,[
- "Starting g1",
- "Starting g2",
- "Yielded 42",
- "Finishing g2",
- "Finishing g1",
- ])
- def test_raising_exception_in_initial_next_call(self):
- """
- Test raising exception in initial next() call
- """
- trace = []
- def g1():
- try:
- trace.append("Starting g1")
- yield from g2()
- finally:
- trace.append("Finishing g1")
- def g2():
- try:
- trace.append("Starting g2")
- raise ValueError("spanish inquisition occurred")
- finally:
- trace.append("Finishing g2")
- try:
- for x in g1():
- trace.append("Yielded %s" % (x,))
- except ValueError as e:
- self.assertEqual(e.args[0], "spanish inquisition occurred")
- else:
- self.fail("subgenerator failed to raise ValueError")
- self.assertEqual(trace,[
- "Starting g1",
- "Starting g2",
- "Finishing g2",
- "Finishing g1",
- ])
- def test_delegation_of_next_call_to_subgenerator(self):
- """
- Test delegation of next() call to subgenerator
- """
- trace = []
- def g1():
- trace.append("Starting g1")
- yield "g1 ham"
- yield from g2()
- yield "g1 eggs"
- trace.append("Finishing g1")
- def g2():
- trace.append("Starting g2")
- yield "g2 spam"
- yield "g2 more spam"
- trace.append("Finishing g2")
- for x in g1():
- trace.append("Yielded %s" % (x,))
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Yielded g2 more spam",
- "Finishing g2",
- "Yielded g1 eggs",
- "Finishing g1",
- ])
- def test_raising_exception_in_delegated_next_call(self):
- """
- Test raising exception in delegated next() call
- """
- trace = []
- def g1():
- try:
- trace.append("Starting g1")
- yield "g1 ham"
- yield from g2()
- yield "g1 eggs"
- finally:
- trace.append("Finishing g1")
- def g2():
- try:
- trace.append("Starting g2")
- yield "g2 spam"
- raise ValueError("hovercraft is full of eels")
- yield "g2 more spam"
- finally:
- trace.append("Finishing g2")
- try:
- for x in g1():
- trace.append("Yielded %s" % (x,))
- except ValueError as e:
- self.assertEqual(e.args[0], "hovercraft is full of eels")
- else:
- self.fail("subgenerator failed to raise ValueError")
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Finishing g2",
- "Finishing g1",
- ])
- def test_delegation_of_send(self):
- """
- Test delegation of send()
- """
- trace = []
- def g1():
- trace.append("Starting g1")
- x = yield "g1 ham"
- trace.append("g1 received %s" % (x,))
- yield from g2()
- x = yield "g1 eggs"
- trace.append("g1 received %s" % (x,))
- trace.append("Finishing g1")
- def g2():
- trace.append("Starting g2")
- x = yield "g2 spam"
- trace.append("g2 received %s" % (x,))
- x = yield "g2 more spam"
- trace.append("g2 received %s" % (x,))
- trace.append("Finishing g2")
- g = g1()
- y = next(g)
- x = 1
- try:
- while 1:
- y = g.send(x)
- trace.append("Yielded %s" % (y,))
- x += 1
- except StopIteration:
- pass
- self.assertEqual(trace,[
- "Starting g1",
- "g1 received 1",
- "Starting g2",
- "Yielded g2 spam",
- "g2 received 2",
- "Yielded g2 more spam",
- "g2 received 3",
- "Finishing g2",
- "Yielded g1 eggs",
- "g1 received 4",
- "Finishing g1",
- ])
- def test_handling_exception_while_delegating_send(self):
- """
- Test handling exception while delegating 'send'
- """
- trace = []
- def g1():
- trace.append("Starting g1")
- x = yield "g1 ham"
- trace.append("g1 received %s" % (x,))
- yield from g2()
- x = yield "g1 eggs"
- trace.append("g1 received %s" % (x,))
- trace.append("Finishing g1")
- def g2():
- trace.append("Starting g2")
- x = yield "g2 spam"
- trace.append("g2 received %s" % (x,))
- raise ValueError("hovercraft is full of eels")
- x = yield "g2 more spam"
- trace.append("g2 received %s" % (x,))
- trace.append("Finishing g2")
- def run():
- g = g1()
- y = next(g)
- x = 1
- try:
- while 1:
- y = g.send(x)
- trace.append("Yielded %s" % (y,))
- x += 1
- except StopIteration:
- trace.append("StopIteration")
- self.assertRaises(ValueError,run)
- self.assertEqual(trace,[
- "Starting g1",
- "g1 received 1",
- "Starting g2",
- "Yielded g2 spam",
- "g2 received 2",
- ])
- def test_delegating_close(self):
- """
- Test delegating 'close'
- """
- trace = []
- def g1():
- try:
- trace.append("Starting g1")
- yield "g1 ham"
- yield from g2()
- yield "g1 eggs"
- finally:
- trace.append("Finishing g1")
- def g2():
- try:
- trace.append("Starting g2")
- yield "g2 spam"
- yield "g2 more spam"
- finally:
- trace.append("Finishing g2")
- g = g1()
- for i in range(2):
- x = next(g)
- trace.append("Yielded %s" % (x,))
- g.close()
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Finishing g2",
- "Finishing g1"
- ])
- def test_handing_exception_while_delegating_close(self):
- """
- Test handling exception while delegating 'close'
- """
- trace = []
- def g1():
- try:
- trace.append("Starting g1")
- yield "g1 ham"
- yield from g2()
- yield "g1 eggs"
- finally:
- trace.append("Finishing g1")
- def g2():
- try:
- trace.append("Starting g2")
- yield "g2 spam"
- yield "g2 more spam"
- finally:
- trace.append("Finishing g2")
- raise ValueError("nybbles have exploded with delight")
- try:
- g = g1()
- for i in range(2):
- x = next(g)
- trace.append("Yielded %s" % (x,))
- g.close()
- except ValueError as e:
- self.assertEqual(e.args[0], "nybbles have exploded with delight")
- self.assertIsInstance(e.__context__, GeneratorExit)
- else:
- self.fail("subgenerator failed to raise ValueError")
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Finishing g2",
- "Finishing g1",
- ])
- def test_delegating_throw(self):
- """
- Test delegating 'throw'
- """
- trace = []
- def g1():
- try:
- trace.append("Starting g1")
- yield "g1 ham"
- yield from g2()
- yield "g1 eggs"
- finally:
- trace.append("Finishing g1")
- def g2():
- try:
- trace.append("Starting g2")
- yield "g2 spam"
- yield "g2 more spam"
- finally:
- trace.append("Finishing g2")
- try:
- g = g1()
- for i in range(2):
- x = next(g)
- trace.append("Yielded %s" % (x,))
- e = ValueError("tomato ejected")
- g.throw(e)
- except ValueError as e:
- self.assertEqual(e.args[0], "tomato ejected")
- else:
- self.fail("subgenerator failed to raise ValueError")
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Finishing g2",
- "Finishing g1",
- ])
- def test_value_attribute_of_StopIteration_exception(self):
- """
- Test 'value' attribute of StopIteration exception
- """
- trace = []
- def pex(e):
- trace.append("%s: %s" % (e.__class__.__name__, e))
- trace.append("value = %s" % (e.value,))
- e = StopIteration()
- pex(e)
- e = StopIteration("spam")
- pex(e)
- e.value = "eggs"
- pex(e)
- self.assertEqual(trace,[
- "StopIteration: ",
- "value = None",
- "StopIteration: spam",
- "value = spam",
- "StopIteration: spam",
- "value = eggs",
- ])
- def test_exception_value_crash(self):
- # There used to be a refcount error when the return value
- # stored in the StopIteration has a refcount of 1.
- def g1():
- yield from g2()
- def g2():
- yield "g2"
- return [42]
- self.assertEqual(list(g1()), ["g2"])
- def test_generator_return_value(self):
- """
- Test generator return value
- """
- trace = []
- def g1():
- trace.append("Starting g1")
- yield "g1 ham"
- ret = yield from g2()
- trace.append("g2 returned %r" % (ret,))
- for v in 1, (2,), StopIteration(3):
- ret = yield from g2(v)
- trace.append("g2 returned %r" % (ret,))
- yield "g1 eggs"
- trace.append("Finishing g1")
- def g2(v = None):
- trace.append("Starting g2")
- yield "g2 spam"
- yield "g2 more spam"
- trace.append("Finishing g2")
- if v:
- return v
- for x in g1():
- trace.append("Yielded %s" % (x,))
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Yielded g2 more spam",
- "Finishing g2",
- "g2 returned None",
- "Starting g2",
- "Yielded g2 spam",
- "Yielded g2 more spam",
- "Finishing g2",
- "g2 returned 1",
- "Starting g2",
- "Yielded g2 spam",
- "Yielded g2 more spam",
- "Finishing g2",
- "g2 returned (2,)",
- "Starting g2",
- "Yielded g2 spam",
- "Yielded g2 more spam",
- "Finishing g2",
- "g2 returned StopIteration(3)",
- "Yielded g1 eggs",
- "Finishing g1",
- ])
- def test_delegation_of_next_to_non_generator(self):
- """
- Test delegation of next() to non-generator
- """
- trace = []
- def g():
- yield from range(3)
- for x in g():
- trace.append("Yielded %s" % (x,))
- self.assertEqual(trace,[
- "Yielded 0",
- "Yielded 1",
- "Yielded 2",
- ])
- def test_conversion_of_sendNone_to_next(self):
- """
- Test conversion of send(None) to next()
- """
- trace = []
- def g():
- yield from range(3)
- gi = g()
- for x in range(3):
- y = gi.send(None)
- trace.append("Yielded: %s" % (y,))
- self.assertEqual(trace,[
- "Yielded: 0",
- "Yielded: 1",
- "Yielded: 2",
- ])
- def test_delegation_of_close_to_non_generator(self):
- """
- Test delegation of close() to non-generator
- """
- trace = []
- def g():
- try:
- trace.append("starting g")
- yield from range(3)
- trace.append("g should not be here")
- finally:
- trace.append("finishing g")
- gi = g()
- next(gi)
- with captured_stderr() as output:
- gi.close()
- self.assertEqual(output.getvalue(), '')
- self.assertEqual(trace,[
- "starting g",
- "finishing g",
- ])
- def test_delegating_throw_to_non_generator(self):
- """
- Test delegating 'throw' to non-generator
- """
- trace = []
- def g():
- try:
- trace.append("Starting g")
- yield from range(10)
- finally:
- trace.append("Finishing g")
- try:
- gi = g()
- for i in range(5):
- x = next(gi)
- trace.append("Yielded %s" % (x,))
- e = ValueError("tomato ejected")
- gi.throw(e)
- except ValueError as e:
- self.assertEqual(e.args[0],"tomato ejected")
- else:
- self.fail("subgenerator failed to raise ValueError")
- self.assertEqual(trace,[
- "Starting g",
- "Yielded 0",
- "Yielded 1",
- "Yielded 2",
- "Yielded 3",
- "Yielded 4",
- "Finishing g",
- ])
- def test_attempting_to_send_to_non_generator(self):
- """
- Test attempting to send to non-generator
- """
- trace = []
- def g():
- try:
- trace.append("starting g")
- yield from range(3)
- trace.append("g should not be here")
- finally:
- trace.append("finishing g")
- try:
- gi = g()
- next(gi)
- for x in range(3):
- y = gi.send(42)
- trace.append("Should not have yielded: %s" % (y,))
- except AttributeError as e:
- self.assertIn("send", e.args[0])
- else:
- self.fail("was able to send into non-generator")
- self.assertEqual(trace,[
- "starting g",
- "finishing g",
- ])
- def test_broken_getattr_handling(self):
- """
- Test subiterator with a broken getattr implementation
- """
- class Broken:
- def __iter__(self):
- return self
- def __next__(self):
- return 1
- def __getattr__(self, attr):
- 1/0
- def g():
- yield from Broken()
- with self.assertRaises(ZeroDivisionError):
- gi = g()
- self.assertEqual(next(gi), 1)
- gi.send(1)
- with self.assertRaises(ZeroDivisionError):
- gi = g()
- self.assertEqual(next(gi), 1)
- gi.throw(AttributeError)
- with support.catch_unraisable_exception() as cm:
- gi = g()
- self.assertEqual(next(gi), 1)
- gi.close()
- self.assertEqual(ZeroDivisionError, cm.unraisable.exc_type)
- def test_exception_in_initial_next_call(self):
- """
- Test exception in initial next() call
- """
- trace = []
- def g1():
- trace.append("g1 about to yield from g2")
- yield from g2()
- trace.append("g1 should not be here")
- def g2():
- yield 1/0
- def run():
- gi = g1()
- next(gi)
- self.assertRaises(ZeroDivisionError,run)
- self.assertEqual(trace,[
- "g1 about to yield from g2"
- ])
- def test_attempted_yield_from_loop(self):
- """
- Test attempted yield-from loop
- """
- trace = []
- def g1():
- trace.append("g1: starting")
- yield "y1"
- trace.append("g1: about to yield from g2")
- yield from g2()
- trace.append("g1 should not be here")
- def g2():
- trace.append("g2: starting")
- yield "y2"
- trace.append("g2: about to yield from g1")
- yield from gi
- trace.append("g2 should not be here")
- try:
- gi = g1()
- for y in gi:
- trace.append("Yielded: %s" % (y,))
- except ValueError as e:
- self.assertEqual(e.args[0],"generator already executing")
- else:
- self.fail("subgenerator didn't raise ValueError")
- self.assertEqual(trace,[
- "g1: starting",
- "Yielded: y1",
- "g1: about to yield from g2",
- "g2: starting",
- "Yielded: y2",
- "g2: about to yield from g1",
- ])
- def test_returning_value_from_delegated_throw(self):
- """
- Test returning value from delegated 'throw'
- """
- trace = []
- def g1():
- try:
- trace.append("Starting g1")
- yield "g1 ham"
- yield from g2()
- yield "g1 eggs"
- finally:
- trace.append("Finishing g1")
- def g2():
- try:
- trace.append("Starting g2")
- yield "g2 spam"
- yield "g2 more spam"
- except LunchError:
- trace.append("Caught LunchError in g2")
- yield "g2 lunch saved"
- yield "g2 yet more spam"
- class LunchError(Exception):
- pass
- g = g1()
- for i in range(2):
- x = next(g)
- trace.append("Yielded %s" % (x,))
- e = LunchError("tomato ejected")
- g.throw(e)
- for x in g:
- trace.append("Yielded %s" % (x,))
- self.assertEqual(trace,[
- "Starting g1",
- "Yielded g1 ham",
- "Starting g2",
- "Yielded g2 spam",
- "Caught LunchError in g2",
- "Yielded g2 yet more spam",
- "Yielded g1 eggs",
- "Finishing g1",
- ])
- def test_next_and_return_with_value(self):
- """
- Test next and return with value
- """
- trace = []
- def f(r):
- gi = g(r)
- next(gi)
- try:
- trace.append("f resuming g")
- next(gi)
- trace.append("f SHOULD NOT BE HERE")
- except StopIteration as e:
- trace.append("f caught %r" % (e,))
- def g(r):
- trace.append("g starting")
- yield
- trace.append("g returning %r" % (r,))
- return r
- f(None)
- f(1)
- f((2,))
- f(StopIteration(3))
- self.assertEqual(trace,[
- "g starting",
- "f resuming g",
- "g returning None",
- "f caught StopIteration()",
- "g starting",
- "f resuming g",
- "g returning 1",
- "f caught StopIteration(1)",
- "g starting",
- "f resuming g",
- "g returning (2,)",
- "f caught StopIteration((2,))",
- "g starting",
- "f resuming g",
- "g returning StopIteration(3)",
- "f caught StopIteration(StopIteration(3))",
- ])
- def test_send_and_return_with_value(self):
- """
- Test send and return with value
- """
- trace = []
- def f(r):
- gi = g(r)
- next(gi)
- try:
- trace.append("f sending spam to g")
- gi.send("spam")
- trace.append("f SHOULD NOT BE HERE")
- except StopIteration as e:
- trace.append("f caught %r" % (e,))
- def g(r):
- trace.append("g starting")
- x = yield
- trace.append("g received %r" % (x,))
- trace.append("g returning %r" % (r,))
- return r
- f(None)
- f(1)
- f((2,))
- f(StopIteration(3))
- self.assertEqual(trace, [
- "g starting",
- "f sending spam to g",
- "g received 'spam'",
- "g returning None",
- "f caught StopIteration()",
- "g starting",
- "f sending spam to g",
- "g received 'spam'",
- "g returning 1",
- 'f caught StopIteration(1)',
- 'g starting',
- 'f sending spam to g',
- "g received 'spam'",
- 'g returning (2,)',
- 'f caught StopIteration((2,))',
- 'g starting',
- 'f sending spam to g',
- "g received 'spam'",
- 'g returning StopIteration(3)',
- 'f caught StopIteration(StopIteration(3))'
- ])
- def test_catching_exception_from_subgen_and_returning(self):
- """
- Test catching an exception thrown into a
- subgenerator and returning a value
- """
- def inner():
- try:
- yield 1
- except ValueError:
- trace.append("inner caught ValueError")
- return value
- def outer():
- v = yield from inner()
- trace.append("inner returned %r to outer" % (v,))
- yield v
- for value in 2, (2,), StopIteration(2):
- trace = []
- g = outer()
- trace.append(next(g))
- trace.append(repr(g.throw(ValueError)))
- self.assertEqual(trace, [
- 1,
- "inner caught ValueError",
- "inner returned %r to outer" % (value,),
- repr(value),
- ])
- def test_throwing_GeneratorExit_into_subgen_that_returns(self):
- """
- Test throwing GeneratorExit into a subgenerator that
- catches it and returns normally.
- """
- trace = []
- def f():
- try:
- trace.append("Enter f")
- yield
- trace.append("Exit f")
- except GeneratorExit:
- return
- def g():
- trace.append("Enter g")
- yield from f()
- trace.append("Exit g")
- try:
- gi = g()
- next(gi)
- gi.throw(GeneratorExit)
- except GeneratorExit:
- pass
- else:
- self.fail("subgenerator failed to raise GeneratorExit")
- self.assertEqual(trace,[
- "Enter g",
- "Enter f",
- ])
- def test_throwing_GeneratorExit_into_subgenerator_that_yields(self):
- """
- Test throwing GeneratorExit into a subgenerator that
- catches it and yields.
- """
- trace = []
- def f():
- try:
- trace.append("Enter f")
- yield
- trace.append("Exit f")
- except GeneratorExit:
- yield
- def g():
- trace.append("Enter g")
- yield from f()
- trace.append("Exit g")
- try:
- gi = g()
- next(gi)
- gi.throw(GeneratorExit)
- except RuntimeError as e:
- self.assertEqual(e.args[0], "generator ignored GeneratorExit")
- else:
- self.fail("subgenerator failed to raise GeneratorExit")
- self.assertEqual(trace,[
- "Enter g",
- "Enter f",
- ])
- def test_throwing_GeneratorExit_into_subgen_that_raises(self):
- """
- Test throwing GeneratorExit into a subgenerator that
- catches it and raises a different exception.
- """
- trace = []
- def f():
- try:
- trace.append("Enter f")
- yield
- trace.append("Exit f")
- except GeneratorExit:
- raise ValueError("Vorpal bunny encountered")
- def g():
- trace.append("Enter g")
- yield from f()
- trace.append("Exit g")
- try:
- gi = g()
- next(gi)
- gi.throw(GeneratorExit)
- except ValueError as e:
- self.assertEqual(e.args[0], "Vorpal bunny encountered")
- self.assertIsInstance(e.__context__, GeneratorExit)
- else:
- self.fail("subgenerator failed to raise ValueError")
- self.assertEqual(trace,[
- "Enter g",
- "Enter f",
- ])
- def test_yield_from_empty(self):
- def g():
- yield from ()
- self.assertRaises(StopIteration, next, g())
- def test_delegating_generators_claim_to_be_running(self):
- # Check with basic iteration
- def one():
- yield 0
- yield from two()
- yield 3
- def two():
- yield 1
- try:
- yield from g1
- except ValueError:
- pass
- yield 2
- g1 = one()
- self.assertEqual(list(g1), [0, 1, 2, 3])
- # Check with send
- g1 = one()
- res = [next(g1)]
- try:
- while True:
- res.append(g1.send(42))
- except StopIteration:
- pass
- self.assertEqual(res, [0, 1, 2, 3])
- # Check with throw
- class MyErr(Exception):
- pass
- def one():
- try:
- yield 0
- except MyErr:
- pass
- yield from two()
- try:
- yield 3
- except MyErr:
- pass
- def two():
- try:
- yield 1
- except MyErr:
- pass
- try:
- yield from g1
- except ValueError:
- pass
- try:
- yield 2
- except MyErr:
- pass
- g1 = one()
- res = [next(g1)]
- try:
- while True:
- res.append(g1.throw(MyErr))
- except StopIteration:
- pass
- except:
- self.assertEqual(res, [0, 1, 2, 3])
- raise
- # Check with close
- class MyIt(object):
- def __iter__(self):
- return self
- def __next__(self):
- return 42
- def close(self_):
- self.assertTrue(g1.gi_running)
- self.assertRaises(ValueError, next, g1)
- def one():
- yield from MyIt()
- g1 = one()
- next(g1)
- g1.close()
- def test_delegator_is_visible_to_debugger(self):
- def call_stack():
- return [f[3] for f in inspect.stack()]
- def gen():
- yield call_stack()
- yield call_stack()
- yield call_stack()
- def spam(g):
- yield from g
- def eggs(g):
- yield from g
- for stack in spam(gen()):
- self.assertTrue('spam' in stack)
- for stack in spam(eggs(gen())):
- self.assertTrue('spam' in stack and 'eggs' in stack)
- def test_custom_iterator_return(self):
- # See issue #15568
- class MyIter:
- def __iter__(self):
- return self
- def __next__(self):
- raise StopIteration(42)
- def gen():
- nonlocal ret
- ret = yield from MyIter()
- ret = None
- list(gen())
- self.assertEqual(ret, 42)
- def test_close_with_cleared_frame(self):
- # See issue #17669.
- #
- # Create a stack of generators: outer() delegating to inner()
- # delegating to innermost(). The key point is that the instance of
- # inner is created first: this ensures that its frame appears before
- # the instance of outer in the GC linked list.
- #
- # At the gc.collect call:
- # - frame_clear is called on the inner_gen frame.
- # - gen_dealloc is called on the outer_gen generator (the only
- # reference is in the frame's locals).
- # - gen_close is called on the outer_gen generator.
- # - gen_close_iter is called to close the inner_gen generator, which
- # in turn calls gen_close, and gen_yf.
- #
- # Previously, gen_yf would crash since inner_gen's frame had been
- # cleared (and in particular f_stacktop was NULL).
- def innermost():
- yield
- def inner():
- outer_gen = yield
- yield from innermost()
- def outer():
- inner_gen = yield
- yield from inner_gen
- with disable_gc():
- inner_gen = inner()
- outer_gen = outer()
- outer_gen.send(None)
- outer_gen.send(inner_gen)
- outer_gen.send(outer_gen)
- del outer_gen
- del inner_gen
- gc_collect()
- def test_send_tuple_with_custom_generator(self):
- # See issue #21209.
- class MyGen:
- def __iter__(self):
- return self
- def __next__(self):
- return 42
- def send(self, what):
- nonlocal v
- v = what
- return None
- def outer():
- v = yield from MyGen()
- g = outer()
- next(g)
- v = None
- g.send((1, 2, 3, 4))
- self.assertEqual(v, (1, 2, 3, 4))
- if __name__ == '__main__':
- unittest.main()
|