summaryrefslogtreecommitdiff
path: root/Lib/test/test_asyncio/test_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r--Lib/test/test_asyncio/test_tasks.py540
1 files changed, 44 insertions, 496 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index a9e4cf5356..86710872ac 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -33,18 +33,6 @@ async def coroutine_function():
pass
-@contextlib.contextmanager
-def set_coroutine_debug(enabled):
- coroutines = asyncio.coroutines
-
- old_debug = coroutines._DEBUG
- try:
- coroutines._DEBUG = enabled
- yield
- finally:
- coroutines._DEBUG = old_debug
-
-
def format_coroutine(qualname, state, src, source_traceback, generator=False):
if generator:
state = '%s' % state
@@ -234,43 +222,6 @@ class BaseTaskTests:
self.assertTrue(t.done())
self.assertEqual(t.result(), 'ok')
- def test_ensure_future_coroutine_2(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- return 'ok'
- t = asyncio.ensure_future(notmuch(), loop=self.loop)
- self.assertIs(t._loop, self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
-
- a = notmuch()
- self.addCleanup(a.close)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- asyncio.ensure_future(a)
- self.assertEqual(cm.warnings[0].filename, __file__)
-
- async def test():
- return asyncio.ensure_future(notmuch())
- t = self.loop.run_until_complete(test())
- self.assertIs(t._loop, self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
-
- # Deprecated in 3.10
- asyncio.set_event_loop(self.loop)
- self.addCleanup(asyncio.set_event_loop, None)
- with self.assertWarns(DeprecationWarning) as cm:
- t = asyncio.ensure_future(notmuch())
- self.assertEqual(cm.warnings[0].filename, __file__)
- self.assertIs(t._loop, self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
-
def test_ensure_future_future(self):
f_orig = self.new_future(self.loop)
f_orig.set_result('ko')
@@ -318,12 +269,10 @@ class BaseTaskTests:
def __init__(self, coro):
self.coro = coro
def __await__(self):
- return (yield from self.coro)
+ return self.coro.__await__()
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- return 'ok'
+ async def coro():
+ return 'ok'
loop = asyncio.new_event_loop()
self.set_event_loop(loop)
@@ -450,68 +399,6 @@ class BaseTaskTests:
self.assertEqual(t.get_name(), '{6}')
self.loop.run_until_complete(t)
- def test_task_repr_coro_decorator(self):
- self.loop.set_debug(False)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- # notmuch() function doesn't use yield from: it will be wrapped by
- # @coroutine decorator
- return 123
-
- # test coroutine function
- self.assertEqual(notmuch.__name__, 'notmuch')
- self.assertRegex(notmuch.__qualname__,
- r'\w+.test_task_repr_coro_decorator'
- r'\.<locals>\.notmuch')
- self.assertEqual(notmuch.__module__, __name__)
-
- # test coroutine object
- gen = notmuch()
- # On Python >= 3.5, generators now inherit the name of the
- # function, as expected, and have a qualified name (__qualname__
- # attribute).
- coro_name = 'notmuch'
- coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
- '.<locals>.notmuch')
- self.assertEqual(gen.__name__, coro_name)
- self.assertEqual(gen.__qualname__, coro_qualname)
-
- # test repr(CoroWrapper)
- if coroutines._DEBUG:
- # format the coroutine object
- if coroutines._DEBUG:
- filename, lineno = test_utils.get_function_source(notmuch)
- frame = gen._source_traceback[-1]
- coro = ('%s() running, defined at %s:%s, created at %s:%s'
- % (coro_qualname, filename, lineno,
- frame[0], frame[1]))
- else:
- code = gen.gi_code
- coro = ('%s() running at %s:%s'
- % (coro_qualname, code.co_filename,
- code.co_firstlineno))
-
- self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
-
- # test pending Task
- t = self.new_task(self.loop, gen)
- t.add_done_callback(Dummy())
-
- # format the coroutine object
- if coroutines._DEBUG:
- src = '%s:%s' % test_utils.get_function_source(notmuch)
- else:
- code = gen.gi_code
- src = '%s:%s' % (code.co_filename, code.co_firstlineno)
- coro = format_coroutine(coro_qualname, 'running', src,
- t._source_traceback,
- generator=not coroutines._DEBUG)
- self.assertEqual(repr(t),
- "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
- self.loop.run_until_complete(t)
-
def test_task_repr_wait_for(self):
self.loop.set_debug(False)
@@ -527,30 +414,6 @@ class BaseTaskTests:
fut.set_result(None)
self.loop.run_until_complete(task)
- def test_task_repr_partial_corowrapper(self):
- # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
- # coroutine is a partial function
- with set_coroutine_debug(True):
- self.loop.set_debug(True)
-
- async def func(x, y):
- await asyncio.sleep(0)
-
- with self.assertWarns(DeprecationWarning):
- partial_func = asyncio.coroutine(functools.partial(func, 1))
- task = self.loop.create_task(partial_func(2))
-
- # make warnings quiet
- task._log_destroy_pending = False
- self.addCleanup(task._coro.close)
-
- coro_repr = repr(task._coro)
- expected = (
- r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
- r'\.<locals>\.func at'
- )
- self.assertRegex(coro_repr, expected)
-
def test_task_basics(self):
async def outer():
@@ -741,12 +604,10 @@ class BaseTaskTests:
(asyncio.CancelledError, ('my message',), 2))
def test_cancel_yield(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def task():
- yield
- yield
- return 12
+ async def task():
+ await asyncio.sleep(0)
+ await asyncio.sleep(0)
+ return 12
t = self.new_task(self.loop, task())
test_utils.run_briefly(self.loop) # start coro
@@ -1322,10 +1183,8 @@ class BaseTaskTests:
def test_wait_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
+ async def coro(s):
+ return s
c = coro('test')
task = self.new_task(
self.loop,
@@ -1587,16 +1446,14 @@ class BaseTaskTests:
completed = set()
time_shifted = False
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def sleeper(dt, x):
- nonlocal time_shifted
- yield from asyncio.sleep(dt)
- completed.add(x)
- if not time_shifted and 'a' in completed and 'b' in completed:
- time_shifted = True
- loop.advance_time(0.14)
- return x
+ async def sleeper(dt, x):
+ nonlocal time_shifted
+ await asyncio.sleep(dt)
+ completed.add(x)
+ if not time_shifted and 'a' in completed and 'b' in completed:
+ time_shifted = True
+ loop.advance_time(0.14)
+ return x
a = sleeper(0.01, 'a')
b = sleeper(0.01, 'b')
@@ -1614,10 +1471,6 @@ class BaseTaskTests:
self.assertTrue('b' in res[:2])
self.assertEqual(res[2], 'c')
- # Doing it again should take no time and exercise a different path.
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
-
def test_as_completed_with_timeout(self):
def gen():
@@ -1727,19 +1580,15 @@ class BaseTaskTests:
def test_as_completed_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
+ async def coro(s):
+ return s
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def runner():
- result = []
- c = coro('ham')
- for f in asyncio.as_completed([c, c, coro('spam')]):
- result.append((yield from f))
- return result
+ async def runner():
+ result = []
+ c = coro('ham')
+ for f in asyncio.as_completed([c, c, coro('spam')]):
+ result.append(await f)
+ return result
fut = self.new_task(self.loop, runner())
self.loop.run_until_complete(fut)
@@ -1900,17 +1749,6 @@ class BaseTaskTests:
self.loop.run_until_complete(task),
'ko')
- def test_step_result(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- yield None
- yield 1
- return 'ko'
-
- self.assertRaises(
- RuntimeError, self.loop.run_until_complete, notmuch())
-
def test_step_result_future(self):
# If coroutine returns future, task waits on this future.
@@ -1983,53 +1821,15 @@ class BaseTaskTests:
yield
self.assertFalse(asyncio.iscoroutinefunction(fn1))
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def fn2():
- yield
+ async def fn2():
+ pass
self.assertTrue(asyncio.iscoroutinefunction(fn2))
self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
- def test_yield_vs_yield_from(self):
- fut = self.new_future(self.loop)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def wait_for_future():
- yield fut
-
- task = wait_for_future()
- with self.assertRaises(RuntimeError):
- self.loop.run_until_complete(task)
-
- self.assertFalse(fut.done())
-
- def test_yield_vs_yield_from_generator(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- yield
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def wait_for_future():
- gen = coro()
- try:
- yield gen
- finally:
- gen.close()
-
- task = wait_for_future()
- self.assertRaises(
- RuntimeError,
- self.loop.run_until_complete, task)
-
def test_coroutine_non_gen_function(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return 'test'
+ async def func():
+ return 'test'
self.assertTrue(asyncio.iscoroutinefunction(func))
@@ -2042,10 +1842,8 @@ class BaseTaskTests:
def test_coroutine_non_gen_function_return_future(self):
fut = self.new_future(self.loop)
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return fut
+ async def func():
+ return fut
async def coro():
fut.set_result('test')
@@ -2053,7 +1851,7 @@ class BaseTaskTests:
t1 = self.new_task(self.loop, func())
t2 = self.new_task(self.loop, coro())
res = self.loop.run_until_complete(t1)
- self.assertEqual(res, 'test')
+ self.assertEqual(res, fut)
self.assertIsNone(t2.result())
def test_current_task(self):
@@ -2309,136 +2107,15 @@ class BaseTaskTests:
self.assertRaises(ValueError, self.loop.run_until_complete,
asyncio.wait([]))
- def test_corowrapper_mocks_generator(self):
-
- def check():
- # A function that asserts various things.
- # Called twice, with different debug flag values.
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- # The actual coroutine.
- self.assertTrue(gen.gi_running)
- yield from fut
-
- # A completed Future used to run the coroutine.
- fut = self.new_future(self.loop)
- fut.set_result(None)
-
- # Call the coroutine.
- gen = coro()
-
- # Check some properties.
- self.assertTrue(asyncio.iscoroutine(gen))
- self.assertIsInstance(gen.gi_frame, types.FrameType)
- self.assertFalse(gen.gi_running)
- self.assertIsInstance(gen.gi_code, types.CodeType)
-
- # Run it.
- self.loop.run_until_complete(gen)
-
- # The frame should have changed.
- self.assertIsNone(gen.gi_frame)
-
- # Test with debug flag cleared.
- with set_coroutine_debug(False):
- check()
-
- # Test with debug flag set.
- with set_coroutine_debug(True):
- check()
-
- def test_yield_from_corowrapper(self):
- with set_coroutine_debug(True):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t1():
- return (yield from t2())
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t2():
- f = self.new_future(self.loop)
- self.new_task(self.loop, t3(f))
- return (yield from f)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t3(f):
- f.set_result((1, 2, 3))
-
- task = self.new_task(self.loop, t1())
- val = self.loop.run_until_complete(task)
- self.assertEqual(val, (1, 2, 3))
-
- def test_yield_from_corowrapper_send(self):
- def foo():
- a = yield
- return a
-
- def call(arg):
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- try:
- cw.send(arg)
- except StopIteration as ex:
- return ex.args[0]
- else:
- raise AssertionError('StopIteration was expected')
-
- self.assertEqual(call((1, 2)), (1, 2))
- self.assertEqual(call('spam'), 'spam')
-
- def test_corowrapper_weakref(self):
- wd = weakref.WeakValueDictionary()
- def foo(): yield from []
- cw = asyncio.coroutines.CoroWrapper(foo())
- wd['cw'] = cw # Would fail without __weakref__ slot.
- cw.gen = None # Suppress warning from __del__.
-
- def test_corowrapper_throw(self):
- # Issue 429: CoroWrapper.throw must be compatible with gen.throw
- def foo():
- value = None
- while True:
- try:
- value = yield value
- except Exception as e:
- value = e
-
- exception = Exception("foo")
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- self.assertIs(exception, cw.throw(exception))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- self.assertIs(exception, cw.throw(Exception, exception))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- exception = cw.throw(Exception, "foo")
- self.assertIsInstance(exception, Exception)
- self.assertEqual(exception.args, ("foo", ))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- exception = cw.throw(Exception, "foo", None)
- self.assertIsInstance(exception, Exception)
- self.assertEqual(exception.args, ("foo", ))
-
def test_log_destroyed_pending_task(self):
Task = self.__class__.Task
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def kill_me(loop):
- future = self.new_future(loop)
- yield from future
- # at this point, the only reference to kill_me() task is
- # the Task._wakeup() method in future._callbacks
- raise Exception("code never reached")
+ async def kill_me(loop):
+ future = self.new_future(loop)
+ await future
+ # at this point, the only reference to kill_me() task is
+ # the Task._wakeup() method in future._callbacks
+ raise Exception("code never reached")
mock_handler = mock.Mock()
self.loop.set_debug(True)
@@ -2456,8 +2133,6 @@ class BaseTaskTests:
self.loop._run_once()
self.assertEqual(len(self.loop._ready), 0)
- # remove the future used in kill_me(), and references to the task
- del coro.gi_frame.f_locals['future']
coro = None
source_traceback = task._source_traceback
task = None
@@ -2491,62 +2166,6 @@ class BaseTaskTests:
loop.run_until_complete(runner())
self.assertFalse(m_log.error.called)
- @mock.patch('asyncio.coroutines.logger')
- def test_coroutine_never_yielded(self, m_log):
- with set_coroutine_debug(True):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro_noop():
- pass
-
- tb_filename = __file__
- tb_lineno = sys._getframe().f_lineno + 2
- # create a coroutine object but don't use it
- coro_noop()
- support.gc_collect()
-
- self.assertTrue(m_log.error.called)
- message = m_log.error.call_args[0][0]
- func_filename, func_lineno = test_utils.get_function_source(coro_noop)
-
- regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
- r'was never yielded from\n'
- r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
- r'.*\n'
- r' File "%s", line %s, in test_coroutine_never_yielded\n'
- r' coro_noop\(\)$'
- % (re.escape(coro_noop.__qualname__),
- re.escape(func_filename), func_lineno,
- re.escape(tb_filename), tb_lineno))
-
- self.assertRegex(message, re.compile(regex, re.DOTALL))
-
- def test_return_coroutine_from_coroutine(self):
- """Return of @asyncio.coroutine()-wrapped function generator object
- from @asyncio.coroutine()-wrapped function should have same effect as
- returning generator object or Future."""
- def check():
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def outer_coro():
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def inner_coro():
- return 1
-
- return inner_coro()
-
- result = self.loop.run_until_complete(outer_coro())
- self.assertEqual(result, 1)
-
- # Test with debug flag cleared.
- with set_coroutine_debug(False):
- check()
-
- # Test with debug flag set.
- with set_coroutine_debug(True):
- check()
-
def test_task_source_traceback(self):
self.loop.set_debug(True)
@@ -2677,10 +2296,8 @@ class BaseTaskTests:
raise ValueError
self.loop.call_soon = call_soon
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- pass
+ async def coro():
+ pass
self.assertFalse(m_log.error.called)
@@ -2708,23 +2325,6 @@ class BaseTaskTests:
"a coroutine was expected, got 123"):
self.new_task(self.loop, 123)
- def test_create_task_with_oldstyle_coroutine(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- pass
-
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
- # test it for the second time to ensure that caching
- # in asyncio.iscoroutine() doesn't break things.
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
def test_create_task_with_async_function(self):
async def coro():
@@ -3365,7 +2965,7 @@ class GatherTestsBase:
def test_env_var_debug(self):
code = '\n'.join((
'import asyncio.coroutines',
- 'print(asyncio.coroutines._DEBUG)'))
+ 'print(asyncio.coroutines._is_debug_mode())'))
# Test with -E to not fail if the unit test was run with
# PYTHONASYNCIODEBUG set to a non-empty string
@@ -3542,10 +3142,8 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
self.other_loop.run_until_complete(fut)
def test_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
+ async def coro(s):
+ return s
c = coro('abc')
fut = self._gather(c, c, coro('def'), c)
self._run_loop(self.one_loop)
@@ -3633,9 +3231,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase):
# otherwise it spills errors and breaks **other** unittests, since
# 'target' is interacting with threads.
- # With this call, `coro` will be advanced, so that
- # CoroWrapper.__del__ won't do anything when asyncio tests run
- # in debug mode.
+ # With this call, `coro` will be advanced.
self.loop.call_soon_threadsafe(coro.send, None)
try:
return future.result(timeout)
@@ -3771,54 +3367,6 @@ class CompatibilityTests(test_utils.TestCase):
self.loop = None
super().tearDown()
- def test_yield_from_awaitable(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- yield from asyncio.sleep(0)
- return 'ok'
-
- result = self.loop.run_until_complete(coro())
- self.assertEqual('ok', result)
-
- def test_await_old_style_coro(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro1():
- return 'ok1'
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro2():
- yield from asyncio.sleep(0)
- return 'ok2'
-
- async def inner():
- return await asyncio.gather(coro1(), coro2())
-
- result = self.loop.run_until_complete(inner())
- self.assertEqual(['ok1', 'ok2'], result)
-
- def test_debug_mode_interop(self):
- # https://bugs.python.org/issue32636
- code = textwrap.dedent("""
- import asyncio
-
- async def native_coro():
- pass
-
- @asyncio.coroutine
- def old_style_coro():
- yield from native_coro()
-
- asyncio.run(old_style_coro())
- """)
-
- assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
- PYTHONASYNCIODEBUG="1")
-
if __name__ == '__main__':
unittest.main()