diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 540 |
1 files changed, 44 insertions, 496 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index a9e4cf5356..86710872ac 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -33,18 +33,6 @@ async def coroutine_function(): pass -@contextlib.contextmanager -def set_coroutine_debug(enabled): - coroutines = asyncio.coroutines - - old_debug = coroutines._DEBUG - try: - coroutines._DEBUG = enabled - yield - finally: - coroutines._DEBUG = old_debug - - def format_coroutine(qualname, state, src, source_traceback, generator=False): if generator: state = '%s' % state @@ -234,43 +222,6 @@ class BaseTaskTests: self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') - def test_ensure_future_coroutine_2(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def notmuch(): - return 'ok' - t = asyncio.ensure_future(notmuch(), loop=self.loop) - self.assertIs(t._loop, self.loop) - self.loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'ok') - - a = notmuch() - self.addCleanup(a.close) - with self.assertWarns(DeprecationWarning) as cm: - with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): - asyncio.ensure_future(a) - self.assertEqual(cm.warnings[0].filename, __file__) - - async def test(): - return asyncio.ensure_future(notmuch()) - t = self.loop.run_until_complete(test()) - self.assertIs(t._loop, self.loop) - self.loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'ok') - - # Deprecated in 3.10 - asyncio.set_event_loop(self.loop) - self.addCleanup(asyncio.set_event_loop, None) - with self.assertWarns(DeprecationWarning) as cm: - t = asyncio.ensure_future(notmuch()) - self.assertEqual(cm.warnings[0].filename, __file__) - self.assertIs(t._loop, self.loop) - self.loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'ok') - def test_ensure_future_future(self): f_orig = self.new_future(self.loop) f_orig.set_result('ko') @@ -318,12 +269,10 @@ class BaseTaskTests: def __init__(self, coro): self.coro = coro def __await__(self): - return (yield from self.coro) + return self.coro.__await__() - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - return 'ok' + async def coro(): + return 'ok' loop = asyncio.new_event_loop() self.set_event_loop(loop) @@ -450,68 +399,6 @@ class BaseTaskTests: self.assertEqual(t.get_name(), '{6}') self.loop.run_until_complete(t) - def test_task_repr_coro_decorator(self): - self.loop.set_debug(False) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def notmuch(): - # notmuch() function doesn't use yield from: it will be wrapped by - # @coroutine decorator - return 123 - - # test coroutine function - self.assertEqual(notmuch.__name__, 'notmuch') - self.assertRegex(notmuch.__qualname__, - r'\w+.test_task_repr_coro_decorator' - r'\.<locals>\.notmuch') - self.assertEqual(notmuch.__module__, __name__) - - # test coroutine object - gen = notmuch() - # On Python >= 3.5, generators now inherit the name of the - # function, as expected, and have a qualified name (__qualname__ - # attribute). - coro_name = 'notmuch' - coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' - '.<locals>.notmuch') - self.assertEqual(gen.__name__, coro_name) - self.assertEqual(gen.__qualname__, coro_qualname) - - # test repr(CoroWrapper) - if coroutines._DEBUG: - # format the coroutine object - if coroutines._DEBUG: - filename, lineno = test_utils.get_function_source(notmuch) - frame = gen._source_traceback[-1] - coro = ('%s() running, defined at %s:%s, created at %s:%s' - % (coro_qualname, filename, lineno, - frame[0], frame[1])) - else: - code = gen.gi_code - coro = ('%s() running at %s:%s' - % (coro_qualname, code.co_filename, - code.co_firstlineno)) - - self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) - - # test pending Task - t = self.new_task(self.loop, gen) - t.add_done_callback(Dummy()) - - # format the coroutine object - if coroutines._DEBUG: - src = '%s:%s' % test_utils.get_function_source(notmuch) - else: - code = gen.gi_code - src = '%s:%s' % (code.co_filename, code.co_firstlineno) - coro = format_coroutine(coro_qualname, 'running', src, - t._source_traceback, - generator=not coroutines._DEBUG) - self.assertEqual(repr(t), - "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) - self.loop.run_until_complete(t) - def test_task_repr_wait_for(self): self.loop.set_debug(False) @@ -527,30 +414,6 @@ class BaseTaskTests: fut.set_result(None) self.loop.run_until_complete(task) - def test_task_repr_partial_corowrapper(self): - # Issue #222: repr(CoroWrapper) must not fail in debug mode if the - # coroutine is a partial function - with set_coroutine_debug(True): - self.loop.set_debug(True) - - async def func(x, y): - await asyncio.sleep(0) - - with self.assertWarns(DeprecationWarning): - partial_func = asyncio.coroutine(functools.partial(func, 1)) - task = self.loop.create_task(partial_func(2)) - - # make warnings quiet - task._log_destroy_pending = False - self.addCleanup(task._coro.close) - - coro_repr = repr(task._coro) - expected = ( - r'<coroutine object \w+\.test_task_repr_partial_corowrapper' - r'\.<locals>\.func at' - ) - self.assertRegex(coro_repr, expected) - def test_task_basics(self): async def outer(): @@ -741,12 +604,10 @@ class BaseTaskTests: (asyncio.CancelledError, ('my message',), 2)) def test_cancel_yield(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def task(): - yield - yield - return 12 + async def task(): + await asyncio.sleep(0) + await asyncio.sleep(0) + return 12 t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start coro @@ -1322,10 +1183,8 @@ class BaseTaskTests: def test_wait_duplicate_coroutines(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s + async def coro(s): + return s c = coro('test') task = self.new_task( self.loop, @@ -1587,16 +1446,14 @@ class BaseTaskTests: completed = set() time_shifted = False - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def sleeper(dt, x): - nonlocal time_shifted - yield from asyncio.sleep(dt) - completed.add(x) - if not time_shifted and 'a' in completed and 'b' in completed: - time_shifted = True - loop.advance_time(0.14) - return x + async def sleeper(dt, x): + nonlocal time_shifted + await asyncio.sleep(dt) + completed.add(x) + if not time_shifted and 'a' in completed and 'b' in completed: + time_shifted = True + loop.advance_time(0.14) + return x a = sleeper(0.01, 'a') b = sleeper(0.01, 'b') @@ -1614,10 +1471,6 @@ class BaseTaskTests: self.assertTrue('b' in res[:2]) self.assertEqual(res[2], 'c') - # Doing it again should take no time and exercise a different path. - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - def test_as_completed_with_timeout(self): def gen(): @@ -1727,19 +1580,15 @@ class BaseTaskTests: def test_as_completed_duplicate_coroutines(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s + async def coro(s): + return s - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def runner(): - result = [] - c = coro('ham') - for f in asyncio.as_completed([c, c, coro('spam')]): - result.append((yield from f)) - return result + async def runner(): + result = [] + c = coro('ham') + for f in asyncio.as_completed([c, c, coro('spam')]): + result.append(await f) + return result fut = self.new_task(self.loop, runner()) self.loop.run_until_complete(fut) @@ -1900,17 +1749,6 @@ class BaseTaskTests: self.loop.run_until_complete(task), 'ko') - def test_step_result(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def notmuch(): - yield None - yield 1 - return 'ko' - - self.assertRaises( - RuntimeError, self.loop.run_until_complete, notmuch()) - def test_step_result_future(self): # If coroutine returns future, task waits on this future. @@ -1983,53 +1821,15 @@ class BaseTaskTests: yield self.assertFalse(asyncio.iscoroutinefunction(fn1)) - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def fn2(): - yield + async def fn2(): + pass self.assertTrue(asyncio.iscoroutinefunction(fn2)) self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) - def test_yield_vs_yield_from(self): - fut = self.new_future(self.loop) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def wait_for_future(): - yield fut - - task = wait_for_future() - with self.assertRaises(RuntimeError): - self.loop.run_until_complete(task) - - self.assertFalse(fut.done()) - - def test_yield_vs_yield_from_generator(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - yield - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def wait_for_future(): - gen = coro() - try: - yield gen - finally: - gen.close() - - task = wait_for_future() - self.assertRaises( - RuntimeError, - self.loop.run_until_complete, task) - def test_coroutine_non_gen_function(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def func(): - return 'test' + async def func(): + return 'test' self.assertTrue(asyncio.iscoroutinefunction(func)) @@ -2042,10 +1842,8 @@ class BaseTaskTests: def test_coroutine_non_gen_function_return_future(self): fut = self.new_future(self.loop) - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def func(): - return fut + async def func(): + return fut async def coro(): fut.set_result('test') @@ -2053,7 +1851,7 @@ class BaseTaskTests: t1 = self.new_task(self.loop, func()) t2 = self.new_task(self.loop, coro()) res = self.loop.run_until_complete(t1) - self.assertEqual(res, 'test') + self.assertEqual(res, fut) self.assertIsNone(t2.result()) def test_current_task(self): @@ -2309,136 +2107,15 @@ class BaseTaskTests: self.assertRaises(ValueError, self.loop.run_until_complete, asyncio.wait([])) - def test_corowrapper_mocks_generator(self): - - def check(): - # A function that asserts various things. - # Called twice, with different debug flag values. - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - # The actual coroutine. - self.assertTrue(gen.gi_running) - yield from fut - - # A completed Future used to run the coroutine. - fut = self.new_future(self.loop) - fut.set_result(None) - - # Call the coroutine. - gen = coro() - - # Check some properties. - self.assertTrue(asyncio.iscoroutine(gen)) - self.assertIsInstance(gen.gi_frame, types.FrameType) - self.assertFalse(gen.gi_running) - self.assertIsInstance(gen.gi_code, types.CodeType) - - # Run it. - self.loop.run_until_complete(gen) - - # The frame should have changed. - self.assertIsNone(gen.gi_frame) - - # Test with debug flag cleared. - with set_coroutine_debug(False): - check() - - # Test with debug flag set. - with set_coroutine_debug(True): - check() - - def test_yield_from_corowrapper(self): - with set_coroutine_debug(True): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def t1(): - return (yield from t2()) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def t2(): - f = self.new_future(self.loop) - self.new_task(self.loop, t3(f)) - return (yield from f) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def t3(f): - f.set_result((1, 2, 3)) - - task = self.new_task(self.loop, t1()) - val = self.loop.run_until_complete(task) - self.assertEqual(val, (1, 2, 3)) - - def test_yield_from_corowrapper_send(self): - def foo(): - a = yield - return a - - def call(arg): - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - try: - cw.send(arg) - except StopIteration as ex: - return ex.args[0] - else: - raise AssertionError('StopIteration was expected') - - self.assertEqual(call((1, 2)), (1, 2)) - self.assertEqual(call('spam'), 'spam') - - def test_corowrapper_weakref(self): - wd = weakref.WeakValueDictionary() - def foo(): yield from [] - cw = asyncio.coroutines.CoroWrapper(foo()) - wd['cw'] = cw # Would fail without __weakref__ slot. - cw.gen = None # Suppress warning from __del__. - - def test_corowrapper_throw(self): - # Issue 429: CoroWrapper.throw must be compatible with gen.throw - def foo(): - value = None - while True: - try: - value = yield value - except Exception as e: - value = e - - exception = Exception("foo") - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - self.assertIs(exception, cw.throw(exception)) - - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - self.assertIs(exception, cw.throw(Exception, exception)) - - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - exception = cw.throw(Exception, "foo") - self.assertIsInstance(exception, Exception) - self.assertEqual(exception.args, ("foo", )) - - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - exception = cw.throw(Exception, "foo", None) - self.assertIsInstance(exception, Exception) - self.assertEqual(exception.args, ("foo", )) - def test_log_destroyed_pending_task(self): Task = self.__class__.Task - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def kill_me(loop): - future = self.new_future(loop) - yield from future - # at this point, the only reference to kill_me() task is - # the Task._wakeup() method in future._callbacks - raise Exception("code never reached") + async def kill_me(loop): + future = self.new_future(loop) + await future + # at this point, the only reference to kill_me() task is + # the Task._wakeup() method in future._callbacks + raise Exception("code never reached") mock_handler = mock.Mock() self.loop.set_debug(True) @@ -2456,8 +2133,6 @@ class BaseTaskTests: self.loop._run_once() self.assertEqual(len(self.loop._ready), 0) - # remove the future used in kill_me(), and references to the task - del coro.gi_frame.f_locals['future'] coro = None source_traceback = task._source_traceback task = None @@ -2491,62 +2166,6 @@ class BaseTaskTests: loop.run_until_complete(runner()) self.assertFalse(m_log.error.called) - @mock.patch('asyncio.coroutines.logger') - def test_coroutine_never_yielded(self, m_log): - with set_coroutine_debug(True): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro_noop(): - pass - - tb_filename = __file__ - tb_lineno = sys._getframe().f_lineno + 2 - # create a coroutine object but don't use it - coro_noop() - support.gc_collect() - - self.assertTrue(m_log.error.called) - message = m_log.error.call_args[0][0] - func_filename, func_lineno = test_utils.get_function_source(coro_noop) - - regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' - r'was never yielded from\n' - r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' - r'.*\n' - r' File "%s", line %s, in test_coroutine_never_yielded\n' - r' coro_noop\(\)$' - % (re.escape(coro_noop.__qualname__), - re.escape(func_filename), func_lineno, - re.escape(tb_filename), tb_lineno)) - - self.assertRegex(message, re.compile(regex, re.DOTALL)) - - def test_return_coroutine_from_coroutine(self): - """Return of @asyncio.coroutine()-wrapped function generator object - from @asyncio.coroutine()-wrapped function should have same effect as - returning generator object or Future.""" - def check(): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def outer_coro(): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def inner_coro(): - return 1 - - return inner_coro() - - result = self.loop.run_until_complete(outer_coro()) - self.assertEqual(result, 1) - - # Test with debug flag cleared. - with set_coroutine_debug(False): - check() - - # Test with debug flag set. - with set_coroutine_debug(True): - check() - def test_task_source_traceback(self): self.loop.set_debug(True) @@ -2677,10 +2296,8 @@ class BaseTaskTests: raise ValueError self.loop.call_soon = call_soon - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - pass + async def coro(): + pass self.assertFalse(m_log.error.called) @@ -2708,23 +2325,6 @@ class BaseTaskTests: "a coroutine was expected, got 123"): self.new_task(self.loop, 123) - def test_create_task_with_oldstyle_coroutine(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - pass - - task = self.new_task(self.loop, coro()) - self.assertIsInstance(task, self.Task) - self.loop.run_until_complete(task) - - # test it for the second time to ensure that caching - # in asyncio.iscoroutine() doesn't break things. - task = self.new_task(self.loop, coro()) - self.assertIsInstance(task, self.Task) - self.loop.run_until_complete(task) - def test_create_task_with_async_function(self): async def coro(): @@ -3365,7 +2965,7 @@ class GatherTestsBase: def test_env_var_debug(self): code = '\n'.join(( 'import asyncio.coroutines', - 'print(asyncio.coroutines._DEBUG)')) + 'print(asyncio.coroutines._is_debug_mode())')) # Test with -E to not fail if the unit test was run with # PYTHONASYNCIODEBUG set to a non-empty string @@ -3542,10 +3142,8 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): self.other_loop.run_until_complete(fut) def test_duplicate_coroutines(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s + async def coro(s): + return s c = coro('abc') fut = self._gather(c, c, coro('def'), c) self._run_loop(self.one_loop) @@ -3633,9 +3231,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): # otherwise it spills errors and breaks **other** unittests, since # 'target' is interacting with threads. - # With this call, `coro` will be advanced, so that - # CoroWrapper.__del__ won't do anything when asyncio tests run - # in debug mode. + # With this call, `coro` will be advanced. self.loop.call_soon_threadsafe(coro.send, None) try: return future.result(timeout) @@ -3771,54 +3367,6 @@ class CompatibilityTests(test_utils.TestCase): self.loop = None super().tearDown() - def test_yield_from_awaitable(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - yield from asyncio.sleep(0) - return 'ok' - - result = self.loop.run_until_complete(coro()) - self.assertEqual('ok', result) - - def test_await_old_style_coro(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro1(): - return 'ok1' - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro2(): - yield from asyncio.sleep(0) - return 'ok2' - - async def inner(): - return await asyncio.gather(coro1(), coro2()) - - result = self.loop.run_until_complete(inner()) - self.assertEqual(['ok1', 'ok2'], result) - - def test_debug_mode_interop(self): - # https://bugs.python.org/issue32636 - code = textwrap.dedent(""" - import asyncio - - async def native_coro(): - pass - - @asyncio.coroutine - def old_style_coro(): - yield from native_coro() - - asyncio.run(old_style_coro()) - """) - - assert_python_ok("-Wignore::DeprecationWarning", "-c", code, - PYTHONASYNCIODEBUG="1") - if __name__ == '__main__': unittest.main() |