summaryrefslogtreecommitdiff
path: root/Lib/test/test_asyncio/test_tasks.py
diff options
context:
space:
mode:
authorChris Jerdonek <chris.jerdonek@gmail.com>2020-05-13 16:18:27 -0700
committerGitHub <noreply@github.com>2020-05-13 16:18:27 -0700
commit75cd8e48c62c97fdb9d9a94fd2335be06084471d (patch)
treebcd719b2442c302d0fb5100fb57894b6517ce090 /Lib/test/test_asyncio/test_tasks.py
parentd6fb53fe42d83a10f1372dd92ffaa6a01d2feffb (diff)
downloadcpython-git-75cd8e48c62c97fdb9d9a94fd2335be06084471d.tar.gz
bpo-29587: Make gen.throw() chain exceptions with yield from (GH-19858)
The previous commits on bpo-29587 got exception chaining working with gen.throw() in the `yield` case. This patch also gets the `yield from` case working. As a consequence, implicit exception chaining now also works in the asyncio scenario of awaiting on a task when an exception is already active. Tests are included for both the asyncio case and the pure generator-only case.
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r--Lib/test/test_asyncio/test_tasks.py27
1 files changed, 27 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 68f3b8cce9..6eb6b46ec8 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -466,6 +466,33 @@ class BaseTaskTests:
t = outer()
self.assertEqual(self.loop.run_until_complete(t), 1042)
+ def test_exception_chaining_after_await(self):
+ # Test that when awaiting on a task when an exception is already
+ # active, if the task raises an exception it will be chained
+ # with the original.
+ loop = asyncio.new_event_loop()
+ self.set_event_loop(loop)
+
+ async def raise_error():
+ raise ValueError
+
+ async def run():
+ try:
+ raise KeyError(3)
+ except Exception as exc:
+ task = self.new_task(loop, raise_error())
+ try:
+ await task
+ except Exception as exc:
+ self.assertEqual(type(exc), ValueError)
+ chained = exc.__context__
+ self.assertEqual((type(chained), chained.args),
+ (KeyError, (3,)))
+
+ task = self.new_task(loop, run())
+ loop.run_until_complete(task)
+ loop.close()
+
def test_cancel(self):
def gen():