summaryrefslogtreecommitdiff
path: root/Lib/test/test_sys_settrace.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_sys_settrace.py')
-rw-r--r--Lib/test/test_sys_settrace.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/Lib/test/test_sys_settrace.py b/Lib/test/test_sys_settrace.py
index 90d1e37a39..2587794c69 100644
--- a/Lib/test/test_sys_settrace.py
+++ b/Lib/test/test_sys_settrace.py
@@ -6,6 +6,8 @@ import sys
import difflib
import gc
from functools import wraps
+import asyncio
+
class tracecontext:
"""Context manager that traces its enter and exit."""
@@ -19,6 +21,20 @@ class tracecontext:
def __exit__(self, *exc_info):
self.output.append(-self.value)
+class asynctracecontext:
+ """Asynchronous context manager that traces its aenter and aexit."""
+ def __init__(self, output, value):
+ self.output = output
+ self.value = value
+
+ async def __aenter__(self):
+ self.output.append(self.value)
+
+ async def __aexit__(self, *exc_info):
+ self.output.append(-self.value)
+
+
+
# A very basic example. If this fails, we're in deep trouble.
def basic():
return 1
@@ -636,6 +652,19 @@ class JumpTestCase(unittest.TestCase):
sys.settrace(None)
self.compare_jump_output(expected, output)
+ def run_async_test(self, func, jumpFrom, jumpTo, expected, error=None,
+ event='line', decorated=False):
+ tracer = JumpTracer(func, jumpFrom, jumpTo, event, decorated)
+ sys.settrace(tracer.trace)
+ output = []
+ if error is None:
+ asyncio.run(func(output))
+ else:
+ with self.assertRaisesRegex(*error):
+ asyncio.run(func(output))
+ sys.settrace(None)
+ self.compare_jump_output(expected, output)
+
def jump_test(jumpFrom, jumpTo, expected, error=None, event='line'):
"""Decorator that creates a test that makes a jump
from one place to another in the following code.
@@ -648,6 +677,18 @@ class JumpTestCase(unittest.TestCase):
return test
return decorator
+ def async_jump_test(jumpFrom, jumpTo, expected, error=None, event='line'):
+ """Decorator that creates a test that makes a jump
+ from one place to another in the following asynchronous code.
+ """
+ def decorator(func):
+ @wraps(func)
+ def test(self):
+ self.run_async_test(func, jumpFrom, jumpTo, expected,
+ error=error, event=event, decorated=True)
+ return test
+ return decorator
+
## The first set of 'jump' tests are for things that are allowed:
@jump_test(1, 3, [3])
@@ -774,12 +815,24 @@ class JumpTestCase(unittest.TestCase):
output.append(2)
output.append(3)
+ @async_jump_test(2, 3, [1, 3])
+ async def test_jump_forwards_out_of_async_with_block(output):
+ async with asynctracecontext(output, 1):
+ output.append(2)
+ output.append(3)
+
@jump_test(3, 1, [1, 2, 1, 2, 3, -2])
def test_jump_backwards_out_of_with_block(output):
output.append(1)
with tracecontext(output, 2):
output.append(3)
+ @async_jump_test(3, 1, [1, 2, 1, 2, 3, -2])
+ async def test_jump_backwards_out_of_async_with_block(output):
+ output.append(1)
+ async with asynctracecontext(output, 2):
+ output.append(3)
+
@jump_test(2, 5, [5])
def test_jump_forwards_out_of_try_finally_block(output):
try:
@@ -843,6 +896,14 @@ class JumpTestCase(unittest.TestCase):
with tracecontext(output, 4):
output.append(5)
+ @async_jump_test(2, 4, [1, 4, 5, -4])
+ async def test_jump_across_async_with(output):
+ output.append(1)
+ async with asynctracecontext(output, 2):
+ output.append(3)
+ async with asynctracecontext(output, 4):
+ output.append(5)
+
@jump_test(4, 5, [1, 3, 5, 6])
def test_jump_out_of_with_block_within_for_block(output):
output.append(1)
@@ -852,6 +913,15 @@ class JumpTestCase(unittest.TestCase):
output.append(5)
output.append(6)
+ @async_jump_test(4, 5, [1, 3, 5, 6])
+ async def test_jump_out_of_async_with_block_within_for_block(output):
+ output.append(1)
+ for i in [1]:
+ async with asynctracecontext(output, 3):
+ output.append(4)
+ output.append(5)
+ output.append(6)
+
@jump_test(4, 5, [1, 2, 3, 5, -2, 6])
def test_jump_out_of_with_block_within_with_block(output):
output.append(1)
@@ -861,6 +931,15 @@ class JumpTestCase(unittest.TestCase):
output.append(5)
output.append(6)
+ @async_jump_test(4, 5, [1, 2, 3, 5, -2, 6])
+ async def test_jump_out_of_async_with_block_within_with_block(output):
+ output.append(1)
+ with tracecontext(output, 2):
+ async with asynctracecontext(output, 3):
+ output.append(4)
+ output.append(5)
+ output.append(6)
+
@jump_test(5, 6, [2, 4, 6, 7])
def test_jump_out_of_with_block_within_finally_block(output):
try:
@@ -871,6 +950,16 @@ class JumpTestCase(unittest.TestCase):
output.append(6)
output.append(7)
+ @async_jump_test(5, 6, [2, 4, 6, 7])
+ async def test_jump_out_of_async_with_block_within_finally_block(output):
+ try:
+ output.append(2)
+ finally:
+ async with asynctracecontext(output, 4):
+ output.append(5)
+ output.append(6)
+ output.append(7)
+
@jump_test(8, 11, [1, 3, 5, 11, 12])
def test_jump_out_of_complex_nested_blocks(output):
output.append(1)
@@ -894,6 +983,14 @@ class JumpTestCase(unittest.TestCase):
output.append(4)
output.append(5)
+ @async_jump_test(3, 5, [1, 2, 5])
+ async def test_jump_out_of_async_with_assignment(output):
+ output.append(1)
+ async with asynctracecontext(output, 2) \
+ as x:
+ output.append(4)
+ output.append(5)
+
@jump_test(3, 6, [1, 6, 8, 9])
def test_jump_over_return_in_try_finally_block(output):
output.append(1)
@@ -996,12 +1093,24 @@ class JumpTestCase(unittest.TestCase):
with tracecontext(output, 2):
output.append(3)
+ @async_jump_test(1, 3, [], (ValueError, 'into'))
+ async def test_no_jump_forwards_into_async_with_block(output):
+ output.append(1)
+ async with asynctracecontext(output, 2):
+ output.append(3)
+
@jump_test(3, 2, [1, 2, -1], (ValueError, 'into'))
def test_no_jump_backwards_into_with_block(output):
with tracecontext(output, 1):
output.append(2)
output.append(3)
+ @async_jump_test(3, 2, [1, 2, -1], (ValueError, 'into'))
+ async def test_no_jump_backwards_into_async_with_block(output):
+ async with asynctracecontext(output, 1):
+ output.append(2)
+ output.append(3)
+
@jump_test(1, 3, [], (ValueError, 'into'))
def test_no_jump_forwards_into_try_finally_block(output):
output.append(1)
@@ -1082,6 +1191,14 @@ class JumpTestCase(unittest.TestCase):
with tracecontext(output, 4):
output.append(5)
+ @async_jump_test(3, 5, [1, 2, -2], (ValueError, 'into'))
+ async def test_no_jump_between_async_with_blocks(output):
+ output.append(1)
+ async with asynctracecontext(output, 2):
+ output.append(3)
+ async with asynctracecontext(output, 4):
+ output.append(5)
+
@jump_test(5, 7, [2, 4], (ValueError, 'finally'))
def test_no_jump_over_return_out_of_finally_block(output):
try: