From 6c83ef761beb162981615fba1c22dc1c0f380568 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 29 Nov 2013 14:36:24 -0500 Subject: - New improvements to the :func:`.text` construct, including more flexible ways to set up bound parameters and return types; in particular, a :func:`.text` can now be turned into a full FROM-object, embeddable in other statements as an alias or CTE using the new method :meth:`.TextClause.columns`. [ticket:2877] --- test/sql/test_compiler.py | 166 --------------------- test/sql/test_generative.py | 6 +- test/sql/test_text.py | 348 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 351 insertions(+), 169 deletions(-) create mode 100644 test/sql/test_text.py (limited to 'test/sql') diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index f1f852ddc..36dfa2ff1 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -1134,172 +1134,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=mysql.dialect() ) - def test_text(self): - self.assert_compile( - text("select * from foo where lala = bar"), - "select * from foo where lala = bar" - ) - - # test bytestring - self.assert_compile(select( - ["foobar(a)", "pk_foo_bar(syslaal)"], - "a = 12", - from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " - "left outer join lala on foobar.foo = lala.foo WHERE a = 12" - ) - - # test unicode - self.assert_compile(select( - ["foobar(a)", "pk_foo_bar(syslaal)"], - "a = 12", - from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " - "left outer join lala on foobar.foo = lala.foo WHERE a = 12" - ) - - # test building a select query programmatically with text - s = select() - s.append_column("column1") - s.append_column("column2") - s.append_whereclause("column1=12") - s.append_whereclause("column2=19") - s = s.order_by("column1") - s.append_from("table1") - self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE " - "column1=12 AND column2=19 ORDER BY column1") - - self.assert_compile( - select(["column1", "column2"], - from_obj=table1).alias('somealias').select(), - "SELECT somealias.column1, somealias.column2 FROM " - "(SELECT column1, column2 FROM mytable) AS somealias" - ) - - # test that use_labels doesnt interfere with literal columns - self.assert_compile( - select(["column1", "column2", table1.c.myid], from_obj=table1, - use_labels=True), - "SELECT column1, column2, mytable.myid AS mytable_myid " - "FROM mytable" - ) - - # test that use_labels doesnt interfere - # with literal columns that have textual labels - self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], - from_obj=table1, use_labels=True), - "SELECT column1 AS foobar, column2 AS hoho, " - "mytable.myid AS mytable_myid FROM mytable" - ) - - # test that "auto-labeling of subquery columns" - # doesnt interfere with literal columns, - # exported columns dont get quoted - self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], - from_obj=[table1]).select(), - "SELECT column1 AS foobar, column2 AS hoho, myid FROM " - "(SELECT column1 AS foobar, column2 AS hoho, " - "mytable.myid AS myid FROM mytable)" - ) - - self.assert_compile( - select(['col1', 'col2'], from_obj='tablename').alias('myalias'), - "SELECT col1, col2 FROM tablename" - ) - - def test_binds_in_text(self): - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=:bar and hoho=:whee", - checkparams={'bar': 4, 'whee': 7}, - ) - - self.assert_compile( - text("select * from foo where clock='05:06:07'"), - "select * from foo where clock='05:06:07'", - checkparams={}, - params={}, - ) - - dialect = postgresql.dialect() - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=%(bar)s and hoho=%(whee)s", - checkparams={'bar': 4, 'whee': 7}, - dialect=dialect - ) - - # test escaping out text() params with a backslash - self.assert_compile( - text("select * from foo where clock='05:06:07' " - "and mork='\:mindy'"), - "select * from foo where clock='05:06:07' and mork=':mindy'", - checkparams={}, - params={}, - dialect=dialect - ) - - dialect = sqlite.dialect() - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=? and hoho=?", - checkparams={'bar': 4, 'whee': 7}, - dialect=dialect - ) - - self.assert_compile(select( - [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], - and_( - "foo.id = foofoo(lala)", - "datetime(foo) = Today", - table1.c.myid == table2.c.otherid, - ) - ), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, sysdate(), foo, bar, lala " - "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " - "datetime(foo) = Today AND mytable.myid = myothertable.otherid") - - self.assert_compile(select( - [alias(table1, 't'), "foo.f"], - "foo.f = t.id", - from_obj=["(select f from bar where lala=heyhey) foo"] - ), - "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " - "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id") - - # test Text embedded within select_from(), using binds - generate_series = text( - "generate_series(:x, :y, :z) as s(a)", - bindparams=[bindparam('x', None), - bindparam('y', None), bindparam('z', None)] - ) - - s = select([ - (func.current_date() + - literal_column("s.a")).label("dates") - ]).select_from(generate_series) - self.assert_compile( - s, - "SELECT CURRENT_DATE + s.a AS dates FROM " - "generate_series(:x, :y, :z) as s(a)", - checkparams={'y': None, 'x': None, 'z': None} - ) - - self.assert_compile( - s.params(x=5, y=6, z=7), - "SELECT CURRENT_DATE + s.a AS dates FROM " - "generate_series(:x, :y, :z) as s(a)", - checkparams={'y': 6, 'x': 5, 'z': 7} - ) - @testing.emits_warning('.*empty sequence.*') def test_render_binds_as_literal(self): """test a compiler that renders binds inline into diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 09b20d8ea..5a65cecef 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -428,13 +428,13 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): class Vis(CloningVisitor): def visit_textclause(self, text): text.text = text.text + " SOME MODIFIER=:lala" - text.bindparams['lala'] = bindparam('lala') + text._bindparams['lala'] = bindparam('lala') clause2 = Vis().traverse(clause) assert c1 == str(clause) assert str(clause2) == c1 + " SOME MODIFIER=:lala" - assert list(clause.bindparams.keys()) == ['bar'] - assert set(clause2.bindparams.keys()) == set(['bar', 'lala']) + assert list(clause._bindparams.keys()) == ['bar'] + assert set(clause2._bindparams.keys()) == set(['bar', 'lala']) def test_select(self): s2 = select([t1]) diff --git a/test/sql/test_text.py b/test/sql/test_text.py new file mode 100644 index 000000000..af9f8db05 --- /dev/null +++ b/test/sql/test_text.py @@ -0,0 +1,348 @@ +"""Test the TextClause and related constructs.""" + +from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_, assert_raises_message +from sqlalchemy import text, select, Integer, String, Float, \ + bindparam, and_, func, literal_column, exc +from sqlalchemy.types import NullType +from sqlalchemy.sql import table, column + +table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), +) + +table2 = table( + 'myothertable', + column('otherid', Integer), + column('othername', String), +) + +class CompileTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_basic(self): + self.assert_compile( + text("select * from foo where lala = bar"), + "select * from foo where lala = bar" + ) + +class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL): + """test the usage of text() implicit within the select() construct + when strings are passed.""" + + __dialect__ = 'default' + + def test_select_composition_one(self): + self.assert_compile(select( + ["foobar(a)", "pk_foo_bar(syslaal)"], + "a = 12", + from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] + ), + "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " + "left outer join lala on foobar.foo = lala.foo WHERE a = 12" + ) + + def test_select_composition_two(self): + s = select() + s.append_column("column1") + s.append_column("column2") + s.append_whereclause("column1=12") + s.append_whereclause("column2=19") + s = s.order_by("column1") + s.append_from("table1") + self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE " + "column1=12 AND column2=19 ORDER BY column1") + + def test_select_composition_three(self): + self.assert_compile( + select(["column1", "column2"], + from_obj=table1).alias('somealias').select(), + "SELECT somealias.column1, somealias.column2 FROM " + "(SELECT column1, column2 FROM mytable) AS somealias" + ) + + def test_select_composition_four(self): + # test that use_labels doesnt interfere with literal columns + self.assert_compile( + select(["column1", "column2", table1.c.myid], from_obj=table1, + use_labels=True), + "SELECT column1, column2, mytable.myid AS mytable_myid " + "FROM mytable" + ) + + def test_select_composition_five(self): + # test that use_labels doesnt interfere + # with literal columns that have textual labels + self.assert_compile( + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], + from_obj=table1, use_labels=True), + "SELECT column1 AS foobar, column2 AS hoho, " + "mytable.myid AS mytable_myid FROM mytable" + ) + + def test_select_composition_six(self): + # test that "auto-labeling of subquery columns" + # doesnt interfere with literal columns, + # exported columns dont get quoted + self.assert_compile( + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], + from_obj=[table1]).select(), + "SELECT column1 AS foobar, column2 AS hoho, myid FROM " + "(SELECT column1 AS foobar, column2 AS hoho, " + "mytable.myid AS myid FROM mytable)" + ) + + def test_select_composition_seven(self): + self.assert_compile( + select(['col1', 'col2'], from_obj='tablename').alias('myalias'), + "SELECT col1, col2 FROM tablename" + ) + + def test_select_composition_eight(self): + self.assert_compile(select( + [table1.alias('t'), "foo.f"], + "foo.f = t.id", + from_obj=["(select f from bar where lala=heyhey) foo"] + ), + "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " + "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id") + + def test_select_bundle_columns(self): + self.assert_compile(select( + [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], + and_( + "foo.id = foofoo(lala)", + "datetime(foo) = Today", + table1.c.myid == table2.c.otherid, + ) + ), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, sysdate(), foo, bar, lala " + "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " + "datetime(foo) = Today AND mytable.myid = myothertable.otherid") + +class BindParamTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_legacy(self): + t = text("select * from foo where lala=:bar and hoho=:whee", + bindparams=[bindparam('bar', 4), bindparam('whee', 7)]) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_positional(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bindparam('bar', 4), bindparam('whee', 7)) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_kw(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bar=4, whee=7) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_positional_plus_kw(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bindparam('bar', 4), whee=7) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def _assert_type_map(self, t, compare): + map_ = dict( + (b.key, b.type) for b in t._bindparams.values() + ) + for k in compare: + assert compare[k]._type_affinity is map_[k]._type_affinity + + def test_typing_construction(self): + t = text("select * from table :foo :bar :bat") + + self._assert_type_map(t, {"foo": NullType(), + "bar": NullType(), + "bat": NullType()}) + + t = t.bindparams(bindparam('foo', type_=String)) + + self._assert_type_map(t, {"foo": String(), + "bar": NullType(), + "bat": NullType()}) + + t = t.bindparams(bindparam('bar', type_=Integer)) + + self._assert_type_map(t, {"foo": String(), + "bar": Integer(), + "bat": NullType()}) + + t = t.bindparams(bat=45.564) + + self._assert_type_map(t, {"foo": String(), + "bar": Integer(), + "bat": Float()}) + + + def test_binds_compiled_named(self): + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee"). + bindparams(bar=4, whee=7), + "select * from foo where lala=%(bar)s and hoho=%(whee)s", + checkparams={'bar': 4, 'whee': 7}, + dialect="postgresql" + ) + + def test_binds_compiled_positional(self): + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee"). + bindparams(bar=4, whee=7), + "select * from foo where lala=? and hoho=?", + checkparams={'bar': 4, 'whee': 7}, + dialect="sqlite" + ) + + def test_missing_bind_kw(self): + assert_raises_message( + exc.ArgumentError, + "This text\(\) construct doesn't define a bound parameter named 'bar'", + text(":foo").bindparams, + foo=5, bar=7 + ) + + def test_missing_bind_posn(self): + assert_raises_message( + exc.ArgumentError, + "This text\(\) construct doesn't define a bound parameter named 'bar'", + text(":foo").bindparams, + bindparam('foo', value=5), bindparam('bar', value=7) + ) + + def test_escaping_colons(self): + # test escaping out text() params with a backslash + self.assert_compile( + text("select * from foo where clock='05:06:07' " + "and mork='\:mindy'"), + "select * from foo where clock='05:06:07' and mork=':mindy'", + checkparams={}, + params={}, + dialect="postgresql" + ) + + + def test_text_in_select_nonfrom(self): + + generate_series = text("generate_series(:x, :y, :z) as s(a)").\ + bindparams(x=None, y=None, z=None) + + s = select([ + (func.current_date() + literal_column("s.a")).label("dates") + ]).select_from(generate_series) + + self.assert_compile( + s, + "SELECT CURRENT_DATE + s.a AS dates FROM " + "generate_series(:x, :y, :z) as s(a)", + checkparams={'y': None, 'x': None, 'z': None} + ) + + self.assert_compile( + s.params(x=5, y=6, z=7), + "SELECT CURRENT_DATE + s.a AS dates FROM " + "generate_series(:x, :y, :z) as s(a)", + checkparams={'y': 6, 'x': 5, 'z': 7} + ) + +class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_basic_toplevel_resultmap_positional(self): + t = text("select id, name from user").columns( + column('id', Integer), + column('name') + ) + + compiled = t.compile() + eq_( + compiled.result_map, + { + 'id': ('id', (t.c.id,), t.c.id.type), + 'name': ('name', (t.c.name,), t.c.name.type) + } + ) + + def test_basic_toplevel_resultmap(self): + t = text("select id, name from user").columns(id=Integer, name=String) + + compiled = t.compile() + eq_( + compiled.result_map, + { + 'id': ('id', (t.c.id,), t.c.id.type), + 'name': ('name', (t.c.name,), t.c.name.type) + } + ) + + def test_basic_subquery_resultmap(self): + t = text("select id, name from user").columns(id=Integer, name=String) + + stmt = select([table1.c.myid]).select_from( + table1.join(t, table1.c.myid == t.c.id)) + compiled = stmt.compile() + eq_( + compiled.result_map, + { + "myid": ("myid", + (table1.c.myid, "myid", "myid"), table1.c.myid.type), + } + ) + + def test_cte(self): + t = text("select id, name from user").columns(id=Integer, name=String).cte('t') + + s = select([table1]).where(table1.c.myid == t.c.id) + self.assert_compile( + s, + "WITH t AS (select id, name from user) " + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, t WHERE mytable.myid = t.id" + ) + + + def test_alias(self): + t = text("select id, name from user").columns(id=Integer, name=String).alias('t') + + s = select([table1]).where(table1.c.myid == t.c.id) + self.assert_compile( + s, + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, (select id, name from user) AS t " + "WHERE mytable.myid = t.id" + ) + + def test_scalar_subquery(self): + t = text("select id from user").columns(id=Integer) + subq = t.as_scalar() + + assert subq.type._type_affinity is Integer()._type_affinity + + s = select([table1.c.myid, subq]).where(table1.c.myid == subq) + self.assert_compile( + s, + "SELECT mytable.myid, (select id from user) AS anon_1 " + "FROM mytable WHERE mytable.myid = (select id from user)" + ) \ No newline at end of file -- cgit v1.2.1