diff options
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r-- | test/sql/test_compiler.py | 392 |
1 files changed, 89 insertions, 303 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index bdfcccb22..53b9f68fc 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -18,7 +18,7 @@ from sqlalchemy import Integer, String, MetaData, Table, Column, select, \ literal, and_, null, type_coerce, alias, or_, literal_column,\ Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\ intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\ - over, subquery, case + over, subquery, case, true import decimal from sqlalchemy.util import u from sqlalchemy import exc, sql, util, types, schema @@ -272,9 +272,10 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT foo() AS foo_1" ) + # this is native_boolean=False for default dialect self.assert_compile( select([not_(True)], use_labels=True), - "SELECT NOT :param_1" + "SELECT :param_1 = 0" ) self.assert_compile( @@ -852,6 +853,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'otherid_1': 9, 'myid_1': 12} ) + # test a generator + self.assert_compile( + and_( + conj for conj in [ + table1.c.myid == 12, + table1.c.name == 'asdf' + ] + ), + "mytable.myid = :myid_1 AND mytable.name = :name_1" + ) + def test_nested_conjunctions_short_circuit(self): """test that empty or_(), and_() conjunctions are collapsed by an enclosing conjunction.""" @@ -874,6 +886,26 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2" ) + def test_true_short_circuit(self): + t = table('t', column('x')) + + self.assert_compile( + select([t]).where(true()), + "SELECT t.x FROM t WHERE 1 = 1", + dialect=default.DefaultDialect(supports_native_boolean=False) + ) + self.assert_compile( + select([t]).where(true()), + "SELECT t.x FROM t WHERE true", + dialect=default.DefaultDialect(supports_native_boolean=True) + ) + + self.assert_compile( + select([t]), + "SELECT t.x FROM t", + dialect=default.DefaultDialect(supports_native_boolean=True) + ) + def test_distinct(self): self.assert_compile( select([table1.c.myid.distinct()]), @@ -1024,80 +1056,22 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_for_update(self): self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), + table1.select(table1.c.myid == 7).with_for_update(), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=False), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1") - # not supported by dialect, should just use update self.assert_compile( - table1.select(table1.c.myid == 7, for_update='nowait'), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - - # unknown lock mode - self.assert_compile( - table1.select(table1.c.myid == 7, for_update='unknown_mode'), + table1.select(table1.c.myid == 7).with_for_update(nowait=True), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - # ----- mysql - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %s FOR UPDATE", - dialect=mysql.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="read"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", - dialect=mysql.dialect()) - - # ----- oracle - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", - dialect=oracle.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="nowait"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", - dialect=oracle.dialect()) - - # ----- postgresql - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE", - dialect=postgresql.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="nowait"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT", - dialect=postgresql.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="read"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE", - dialect=postgresql.dialect()) + assert_raises_message( + exc.ArgumentError, + "Unknown for_update argument: 'unknown_mode'", + table1.select, table1.c.myid == 7, for_update='unknown_mode' + ) - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="read_nowait"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT", - dialect=postgresql.dialect()) def test_alias(self): # test the alias for a table1. column names stay the same, @@ -1171,172 +1145,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=mysql.dialect() ) - def test_text(self): - self.assert_compile( - text("select * from foo where lala = bar"), - "select * from foo where lala = bar" - ) - - # test bytestring - self.assert_compile(select( - ["foobar(a)", "pk_foo_bar(syslaal)"], - "a = 12", - from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " - "left outer join lala on foobar.foo = lala.foo WHERE a = 12" - ) - - # test unicode - self.assert_compile(select( - ["foobar(a)", "pk_foo_bar(syslaal)"], - "a = 12", - from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " - "left outer join lala on foobar.foo = lala.foo WHERE a = 12" - ) - - # test building a select query programmatically with text - s = select() - s.append_column("column1") - s.append_column("column2") - s.append_whereclause("column1=12") - s.append_whereclause("column2=19") - s = s.order_by("column1") - s.append_from("table1") - self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE " - "column1=12 AND column2=19 ORDER BY column1") - - self.assert_compile( - select(["column1", "column2"], - from_obj=table1).alias('somealias').select(), - "SELECT somealias.column1, somealias.column2 FROM " - "(SELECT column1, column2 FROM mytable) AS somealias" - ) - - # test that use_labels doesnt interfere with literal columns - self.assert_compile( - select(["column1", "column2", table1.c.myid], from_obj=table1, - use_labels=True), - "SELECT column1, column2, mytable.myid AS mytable_myid " - "FROM mytable" - ) - - # test that use_labels doesnt interfere - # with literal columns that have textual labels - self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], - from_obj=table1, use_labels=True), - "SELECT column1 AS foobar, column2 AS hoho, " - "mytable.myid AS mytable_myid FROM mytable" - ) - - # test that "auto-labeling of subquery columns" - # doesnt interfere with literal columns, - # exported columns dont get quoted - self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], - from_obj=[table1]).select(), - "SELECT column1 AS foobar, column2 AS hoho, myid FROM " - "(SELECT column1 AS foobar, column2 AS hoho, " - "mytable.myid AS myid FROM mytable)" - ) - - self.assert_compile( - select(['col1', 'col2'], from_obj='tablename').alias('myalias'), - "SELECT col1, col2 FROM tablename" - ) - - def test_binds_in_text(self): - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=:bar and hoho=:whee", - checkparams={'bar': 4, 'whee': 7}, - ) - - self.assert_compile( - text("select * from foo where clock='05:06:07'"), - "select * from foo where clock='05:06:07'", - checkparams={}, - params={}, - ) - - dialect = postgresql.dialect() - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=%(bar)s and hoho=%(whee)s", - checkparams={'bar': 4, 'whee': 7}, - dialect=dialect - ) - - # test escaping out text() params with a backslash - self.assert_compile( - text("select * from foo where clock='05:06:07' " - "and mork='\:mindy'"), - "select * from foo where clock='05:06:07' and mork=':mindy'", - checkparams={}, - params={}, - dialect=dialect - ) - - dialect = sqlite.dialect() - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=? and hoho=?", - checkparams={'bar': 4, 'whee': 7}, - dialect=dialect - ) - - self.assert_compile(select( - [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], - and_( - "foo.id = foofoo(lala)", - "datetime(foo) = Today", - table1.c.myid == table2.c.otherid, - ) - ), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, sysdate(), foo, bar, lala " - "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " - "datetime(foo) = Today AND mytable.myid = myothertable.otherid") - - self.assert_compile(select( - [alias(table1, 't'), "foo.f"], - "foo.f = t.id", - from_obj=["(select f from bar where lala=heyhey) foo"] - ), - "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " - "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id") - - # test Text embedded within select_from(), using binds - generate_series = text( - "generate_series(:x, :y, :z) as s(a)", - bindparams=[bindparam('x', None), - bindparam('y', None), bindparam('z', None)] - ) - - s = select([ - (func.current_date() + - literal_column("s.a")).label("dates") - ]).select_from(generate_series) - self.assert_compile( - s, - "SELECT CURRENT_DATE + s.a AS dates FROM " - "generate_series(:x, :y, :z) as s(a)", - checkparams={'y': None, 'x': None, 'z': None} - ) - - self.assert_compile( - s.params(x=5, y=6, z=7), - "SELECT CURRENT_DATE + s.a AS dates FROM " - "generate_series(:x, :y, :z) as s(a)", - checkparams={'y': 6, 'x': 5, 'z': 7} - ) - @testing.emits_warning('.*empty sequence.*') def test_render_binds_as_literal(self): """test a compiler that renders binds inline into @@ -1377,8 +1185,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=dialect ) - assert_raises( + assert_raises_message( exc.CompileError, + "Bind parameter 'foo' without a renderable value not allowed here.", bindparam("foo").in_([]).compile, dialect=dialect ) @@ -1422,58 +1231,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "/ values.val1 > :param_1" ) - def test_collate(self): - for expr in (select([table1.c.name.collate('latin1_german2_ci')]), - select([collate(table1.c.name, 'latin1_german2_ci')])): - self.assert_compile( - expr, "SELECT mytable.name COLLATE latin1_german2_ci " - "AS anon_1 FROM mytable") - - assert table1.c.name.collate('latin1_german2_ci').type is \ - table1.c.name.type - - expr = select([table1.c.name.collate('latin1_german2_ci').\ - label('k1')]).order_by('k1') - self.assert_compile(expr, - "SELECT mytable.name " - "COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1") - - expr = select([collate('foo', 'latin1_german2_ci').label('k1')]) - self.assert_compile(expr, - "SELECT :param_1 COLLATE latin1_german2_ci AS k1") - - expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')]) - self.assert_compile(expr, - "SELECT mytable.name COLLATE latin1_german2_ci " - "LIKE :param_1 AS anon_1 FROM mytable") - - expr = select([table1.c.name.like(collate('%x%', - 'latin1_german2_ci'))]) - self.assert_compile(expr, - "SELECT mytable.name " - "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 " - "FROM mytable") - - expr = select([table1.c.name.collate('col1').like( - collate('%x%', 'col2'))]) - self.assert_compile(expr, - "SELECT mytable.name COLLATE col1 " - "LIKE :param_1 COLLATE col2 AS anon_1 " - "FROM mytable") - - expr = select([func.concat('a', 'b').\ - collate('latin1_german2_ci').label('x')]) - self.assert_compile(expr, - "SELECT concat(:param_1, :param_2) " - "COLLATE latin1_german2_ci AS x") - - - expr = select([table1.c.name]).\ - order_by(table1.c.name.collate('latin1_german2_ci')) - self.assert_compile(expr, - "SELECT mytable.name FROM mytable ORDER BY " - "mytable.name COLLATE latin1_german2_ci") - def test_percent_chars(self): t = table("table%name", column("percent%"), @@ -2785,10 +2542,6 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateTable(t1).compile ) - # there's some unicode issue in the assertion - # regular expression that appears to be resolved - # in 2.6, not exactly sure what it is - @testing.requires.python26 def test_reraise_of_column_spec_issue_unicode(self): MyType = self._illegal_type_fixture() t1 = Table('t', MetaData(), @@ -2800,6 +2553,22 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateTable(t1).compile ) + def test_system_flag(self): + m = MetaData() + t = Table('t', m, Column('x', Integer), + Column('y', Integer, system=True), + Column('z', Integer)) + self.assert_compile( + schema.CreateTable(t), + "CREATE TABLE t (x INTEGER, z INTEGER)" + ) + m2 = MetaData() + t2 = t.tometadata(m2) + self.assert_compile( + schema.CreateTable(t2), + "CREATE TABLE t (x INTEGER, z INTEGER)" + ) + class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -2909,6 +2678,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): "(:rem_id, :datatype_id, :value)") + class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -3238,13 +3008,34 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): ) class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = default.DefaultDialect(supports_native_boolean=True) def _fixture(self): m = MetaData() return Table('foo', m, Column('id', Integer)) + bool_table = table('t', column('x', Boolean)) + + def test_coerce_bool_where(self): + self.assert_compile( + select([self.bool_table]).where(self.bool_table.c.x), + "SELECT t.x FROM t WHERE t.x" + ) + + def test_coerce_bool_where_non_native(self): + self.assert_compile( + select([self.bool_table]).where(self.bool_table.c.x), + "SELECT t.x FROM t WHERE t.x = 1", + dialect=default.DefaultDialect(supports_native_boolean=False) + ) + + self.assert_compile( + select([self.bool_table]).where(~self.bool_table.c.x), + "SELECT t.x FROM t WHERE t.x = 0", + dialect=default.DefaultDialect(supports_native_boolean=False) + ) + def test_null_constant(self): self.assert_compile(_literal_as_text(None), "NULL") @@ -3257,12 +3048,12 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): def test_val_and_false(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, False), - "foo.id = :id_1 AND false") + "false") def test_val_and_true_coerced(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, True), - "foo.id = :id_1 AND true") + "foo.id = :id_1") def test_val_is_null_coerced(self): t = self._fixture() @@ -3270,26 +3061,21 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): "foo.id IS NULL") def test_val_and_None(self): - # current convention is None in and_() or - # other clauselist is ignored. May want - # to revise this at some point. t = self._fixture() self.assert_compile(and_(t.c.id == 1, None), - "foo.id = :id_1") + "foo.id = :id_1 AND NULL") def test_None_and_val(self): - # current convention is None in and_() or - # other clauselist is ignored. May want - # to revise this at some point. t = self._fixture() - self.assert_compile(and_(t.c.id == 1, None), - "foo.id = :id_1") + self.assert_compile(and_(None, t.c.id == 1), + "NULL AND foo.id = :id_1") def test_None_and_nothing(self): # current convention is None in and_() # returns None May want # to revise this at some point. - assert and_(None) is None + self.assert_compile( + and_(None), "NULL") def test_val_and_null(self): t = self._fixture() |