import pytest import asyncio from jinja2 import Template, Environment, DictLoader from jinja2.asyncsupport import auto_aiter from jinja2.exceptions import TemplateNotFound, TemplatesNotFound, \ UndefinedError def run(coro): loop = asyncio.get_event_loop() return loop.run_until_complete(coro) def test_basic_async(): t = Template('{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}', enable_async=True) async def func(): return await t.render_async() rv = run(func()) assert rv == '[1][2][3]' def test_await_on_calls(): t = Template('{{ async_func() + normal_func() }}', enable_async=True) async def async_func(): return 42 def normal_func(): return 23 async def func(): return await t.render_async( async_func=async_func, normal_func=normal_func ) rv = run(func()) assert rv == '65' def test_await_on_calls_normal_render(): t = Template('{{ async_func() + normal_func() }}', enable_async=True) async def async_func(): return 42 def normal_func(): return 23 rv = t.render( async_func=async_func, normal_func=normal_func ) assert rv == '65' def test_await_and_macros(): t = Template('{% macro foo(x) %}[{{ x }}][{{ async_func() }}]' '{% endmacro %}{{ foo(42) }}', enable_async=True) async def async_func(): return 42 async def func(): return await t.render_async(async_func=async_func) rv = run(func()) assert rv == '[42][42]' def test_async_blocks(): t = Template('{% block foo %}{% endblock %}{{ self.foo() }}', enable_async=True, autoescape=True) async def func(): return await t.render_async() rv = run(func()) assert rv == '' def test_async_generate(): t = Template('{% for x in [1, 2, 3] %}{{ x }}{% endfor %}', enable_async=True) rv = list(t.generate()) assert rv == ['1', '2', '3'] def test_async_iteration_in_templates(): t = Template('{% for x in rng %}{{ x }}{% endfor %}', enable_async=True) async def async_iterator(): for item in [1, 2, 3]: yield item rv = list(t.generate(rng=async_iterator())) assert rv == ['1', '2', '3'] def test_async_iteration_in_templates_extended(): t = Template('{% for x in rng %}{{ loop.index0 }}/{{ x }}{% endfor %}', enable_async=True) async def async_iterator(): for item in [1, 2, 3]: yield item rv = list(t.generate(rng=async_iterator())) assert rv == ['0/1', '1/2', '2/3'] @pytest.fixture def test_env_async(): env = Environment(loader=DictLoader(dict( module='{% macro test() %}[{{ foo }}|{{ bar }}]{% endmacro %}', header='[{{ foo }}|{{ 23 }}]', o_printer='({{ o }})' )), enable_async=True) env.globals['bar'] = 23 return env @pytest.mark.imports class TestAsyncImports(object): def test_context_imports(self, test_env_async): t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}') assert t.render(foo=42) == '[|23]' t = test_env_async.from_string( '{% import "module" as m without context %}{{ m.test() }}' ) assert t.render(foo=42) == '[|23]' t = test_env_async.from_string( '{% import "module" as m with context %}{{ m.test() }}' ) assert t.render(foo=42) == '[42|23]' t = test_env_async.from_string('{% from "module" import test %}{{ test() }}') assert t.render(foo=42) == '[|23]' t = test_env_async.from_string( '{% from "module" import test without context %}{{ test() }}' ) assert t.render(foo=42) == '[|23]' t = test_env_async.from_string( '{% from "module" import test with context %}{{ test() }}' ) assert t.render(foo=42) == '[42|23]' def test_trailing_comma(self, test_env_async): test_env_async.from_string('{% from "foo" import bar, baz with context %}') test_env_async.from_string('{% from "foo" import bar, baz, with context %}') test_env_async.from_string('{% from "foo" import bar, with context %}') test_env_async.from_string('{% from "foo" import bar, with, context %}') test_env_async.from_string('{% from "foo" import bar, with with context %}') def test_exports(self, test_env_async): m = run(test_env_async.from_string(''' {% macro toplevel() %}...{% endmacro %} {% macro __private() %}...{% endmacro %} {% set variable = 42 %} {% for item in [1] %} {% macro notthere() %}{% endmacro %} {% endfor %} ''')._get_default_module_async()) assert run(m.toplevel()) == '...' assert not hasattr(m, '__missing') assert m.variable == 42 assert not hasattr(m, 'notthere') @pytest.mark.imports @pytest.mark.includes class TestAsyncIncludes(object): def test_context_include(self, test_env_async): t = test_env_async.from_string('{% include "header" %}') assert t.render(foo=42) == '[42|23]' t = test_env_async.from_string('{% include "header" with context %}') assert t.render(foo=42) == '[42|23]' t = test_env_async.from_string('{% include "header" without context %}') assert t.render(foo=42) == '[|23]' def test_choice_includes(self, test_env_async): t = test_env_async.from_string('{% include ["missing", "header"] %}') assert t.render(foo=42) == '[42|23]' t = test_env_async.from_string( '{% include ["missing", "missing2"] ignore missing %}' ) assert t.render(foo=42) == '' t = test_env_async.from_string('{% include ["missing", "missing2"] %}') pytest.raises(TemplateNotFound, t.render) with pytest.raises(TemplatesNotFound) as e: t.render() assert e.value.templates == ['missing', 'missing2'] assert e.value.name == 'missing2' def test_includes(t, **ctx): ctx['foo'] = 42 assert t.render(ctx) == '[42|23]' t = test_env_async.from_string('{% include ["missing", "header"] %}') test_includes(t) t = test_env_async.from_string('{% include x %}') test_includes(t, x=['missing', 'header']) t = test_env_async.from_string('{% include [x, "header"] %}') test_includes(t, x='missing') t = test_env_async.from_string('{% include x %}') test_includes(t, x='header') t = test_env_async.from_string('{% include x %}') test_includes(t, x='header') t = test_env_async.from_string('{% include [x] %}') test_includes(t, x='header') def test_include_ignoring_missing(self, test_env_async): t = test_env_async.from_string('{% include "missing" %}') pytest.raises(TemplateNotFound, t.render) for extra in '', 'with context', 'without context': t = test_env_async.from_string('{% include "missing" ignore missing ' + extra + ' %}') assert t.render() == '' def test_context_include_with_overrides(self, test_env_async): env = Environment(loader=DictLoader(dict( main="{% for item in [1, 2, 3] %}{% include 'item' %}{% endfor %}", item="{{ item }}" ))) assert env.get_template("main").render() == "123" def test_unoptimized_scopes(self, test_env_async): t = test_env_async.from_string(""" {% macro outer(o) %} {% macro inner() %} {% include "o_printer" %} {% endmacro %} {{ inner() }} {% endmacro %} {{ outer("FOO") }} """) assert t.render().strip() == '(FOO)' def test_unoptimized_scopes_autoescape(self): env = Environment(loader=DictLoader(dict( o_printer='({{ o }})', )), autoescape=True, enable_async=True) t = env.from_string(""" {% macro outer(o) %} {% macro inner() %} {% include "o_printer" %} {% endmacro %} {{ inner() }} {% endmacro %} {{ outer("FOO") }} """) assert t.render().strip() == '(FOO)' @pytest.mark.core_tags @pytest.mark.for_loop class TestAsyncForLoop(object): def test_simple(self, test_env_async): tmpl = test_env_async.from_string('{% for item in seq %}{{ item }}{% endfor %}') assert tmpl.render(seq=list(range(10))) == '0123456789' def test_else(self, test_env_async): tmpl = test_env_async.from_string( '{% for item in seq %}XXX{% else %}...{% endfor %}') assert tmpl.render() == '...' def test_empty_blocks(self, test_env_async): tmpl = test_env_async.from_string('<{% for item in seq %}{% else %}{% endfor %}>') assert tmpl.render() == '<>' @pytest.mark.parametrize( "transform", [lambda x: x, iter, reversed, lambda x: (i for i in x), auto_aiter] ) def test_context_vars(self, test_env_async, transform): t = test_env_async.from_string( "{% for item in seq %}{{ loop.index }}|{{ loop.index0 }}" "|{{ loop.revindex }}|{{ loop.revindex0 }}|{{ loop.first }}" "|{{ loop.last }}|{{ loop.length }}\n{% endfor %}" ) out = t.render(seq=transform([42, 24])) assert out == "1|0|2|1|True|False|2\n2|1|1|0|False|True|2\n" def test_cycling(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq %}{{ loop.cycle('<1>', '<2>') }}{% endfor %}{% for item in seq %}{{ loop.cycle(*through) }}{% endfor %}''') output = tmpl.render(seq=list(range(4)), through=('<1>', '<2>')) assert output == '<1><2>' * 4 def test_lookaround(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq -%} {{ loop.previtem|default('x') }}-{{ item }}-{{ loop.nextitem|default('x') }}| {%- endfor %}''') output = tmpl.render(seq=list(range(4))) assert output == 'x-0-1|0-1-2|1-2-3|2-3-x|' def test_changed(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq -%} {{ loop.changed(item) }}, {%- endfor %}''') output = tmpl.render(seq=[None, None, 1, 2, 2, 3, 4, 4, 4]) assert output == 'True,False,True,True,False,True,True,False,False,' def test_scope(self, test_env_async): tmpl = test_env_async.from_string('{% for item in seq %}{% endfor %}{{ item }}') output = tmpl.render(seq=list(range(10))) assert not output def test_varlen(self, test_env_async): def inner(): for item in range(5): yield item tmpl = test_env_async.from_string('{% for item in iter %}{{ item }}{% endfor %}') output = tmpl.render(iter=inner()) assert output == '01234' def test_noniter(self, test_env_async): tmpl = test_env_async.from_string('{% for item in none %}...{% endfor %}') pytest.raises(TypeError, tmpl.render) def test_recursive(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq recursive -%} [{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] {%- endfor %}''') assert tmpl.render(seq=[ dict(a=1, b=[dict(a=1), dict(a=2)]), dict(a=2, b=[dict(a=1), dict(a=2)]), dict(a=3, b=[dict(a='a')]) ]) == '[1<[1][2]>][2<[1][2]>][3<[a]>]' def test_recursive_lookaround(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq recursive -%} [{{ loop.previtem.a if loop.previtem is defined else 'x' }}.{{ item.a }}.{{ loop.nextitem.a if loop.nextitem is defined else 'x' }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] {%- endfor %}''') assert tmpl.render(seq=[ dict(a=1, b=[dict(a=1), dict(a=2)]), dict(a=2, b=[dict(a=1), dict(a=2)]), dict(a=3, b=[dict(a='a')]) ]) == '[x.1.2<[x.1.2][1.2.x]>][1.2.3<[x.1.2][1.2.x]>][2.3.x<[x.a.x]>]' def test_recursive_depth0(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq recursive -%} [{{ loop.depth0 }}:{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] {%- endfor %}''') assert tmpl.render(seq=[ dict(a=1, b=[dict(a=1), dict(a=2)]), dict(a=2, b=[dict(a=1), dict(a=2)]), dict(a=3, b=[dict(a='a')]) ]) == '[0:1<[1:1][1:2]>][0:2<[1:1][1:2]>][0:3<[1:a]>]' def test_recursive_depth(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in seq recursive -%} [{{ loop.depth }}:{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}] {%- endfor %}''') assert tmpl.render(seq=[ dict(a=1, b=[dict(a=1), dict(a=2)]), dict(a=2, b=[dict(a=1), dict(a=2)]), dict(a=3, b=[dict(a='a')]) ]) == '[1:1<[2:1][2:2]>][1:2<[2:1][2:2]>][1:3<[2:a]>]' def test_looploop(self, test_env_async): tmpl = test_env_async.from_string('''{% for row in table %} {%- set rowloop = loop -%} {% for cell in row -%} [{{ rowloop.index }}|{{ loop.index }}] {%- endfor %} {%- endfor %}''') assert tmpl.render(table=['ab', 'cd']) == '[1|1][1|2][2|1][2|2]' def test_reversed_bug(self, test_env_async): tmpl = test_env_async.from_string('{% for i in items %}{{ i }}' '{% if not loop.last %}' ',{% endif %}{% endfor %}') assert tmpl.render(items=reversed([3, 2, 1])) == '1,2,3' def test_loop_errors(self, test_env_async): tmpl = test_env_async.from_string('''{% for item in [1] if loop.index == 0 %}...{% endfor %}''') pytest.raises(UndefinedError, tmpl.render) tmpl = test_env_async.from_string('''{% for item in [] %}...{% else %}{{ loop }}{% endfor %}''') assert tmpl.render() == '' def test_loop_filter(self, test_env_async): tmpl = test_env_async.from_string('{% for item in range(10) if item ' 'is even %}[{{ item }}]{% endfor %}') assert tmpl.render() == '[0][2][4][6][8]' tmpl = test_env_async.from_string(''' {%- for item in range(10) if item is even %}[{{ loop.index }}:{{ item }}]{% endfor %}''') assert tmpl.render() == '[1:0][2:2][3:4][4:6][5:8]' def test_scoped_special_var(self, test_env_async): t = test_env_async.from_string( '{% for s in seq %}[{{ loop.first }}{% for c in s %}' '|{{ loop.first }}{% endfor %}]{% endfor %}') assert t.render(seq=('ab', 'cd')) \ == '[True|True|False][False|True|False]' def test_scoped_loop_var(self, test_env_async): t = test_env_async.from_string('{% for x in seq %}{{ loop.first }}' '{% for y in seq %}{% endfor %}{% endfor %}') assert t.render(seq='ab') == 'TrueFalse' t = test_env_async.from_string('{% for x in seq %}{% for y in seq %}' '{{ loop.first }}{% endfor %}{% endfor %}') assert t.render(seq='ab') == 'TrueFalseTrueFalse' def test_recursive_empty_loop_iter(self, test_env_async): t = test_env_async.from_string(''' {%- for item in foo recursive -%}{%- endfor -%} ''') assert t.render(dict(foo=[])) == '' def test_call_in_loop(self, test_env_async): t = test_env_async.from_string(''' {%- macro do_something() -%} [{{ caller() }}] {%- endmacro %} {%- for i in [1, 2, 3] %} {%- call do_something() -%} {{ i }} {%- endcall %} {%- endfor -%} ''') assert t.render() == '[1][2][3]' def test_scoping_bug(self, test_env_async): t = test_env_async.from_string(''' {%- for item in foo %}...{{ item }}...{% endfor %} {%- macro item(a) %}...{{ a }}...{% endmacro %} {{- item(2) -}} ''') assert t.render(foo=(1,)) == '...1......2...' def test_unpacking(self, test_env_async): tmpl = test_env_async.from_string('{% for a, b, c in [[1, 2, 3]] %}' '{{ a }}|{{ b }}|{{ c }}{% endfor %}') assert tmpl.render() == '1|2|3' def test_recursive_loop_filter(self, test_env_async): t = test_env_async.from_string(''' {%- for page in [site.root] if page.url != this recursive %} {{ page.url }} {{- loop(page.children) }} {%- endfor %} ''') sm =t.render(this='/foo', site={'root': { 'url': '/', 'children': [ {'url': '/foo'}, {'url': '/bar'}, ] }}) lines = [x.strip() for x in sm.splitlines() if x.strip()] assert lines == [ '', '', '/', '/bar', '', ] def test_nonrecursive_loop_filter(self, test_env_async): t = test_env_async.from_string(''' {%- for page in items if page.url != this %} {{ page.url }} {%- endfor %} ''') sm =t.render(this='/foo', items=[ {'url': '/'}, {'url': '/foo'}, {'url': '/bar'}, ]) lines = [x.strip() for x in sm.splitlines() if x.strip()] assert lines == [ '', '', '/', '/bar', '', ] def test_bare_async(self, test_env_async): t = test_env_async.from_string('{% extends "header" %}') assert t.render(foo=42) == '[42|23]'