diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
| commit | 45cec095b4904ba71425d2fe18c143982dd08f43 (patch) | |
| tree | af5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/sql/test_select.py | |
| parent | 698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff) | |
| download | sqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz | |
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run
the tests. [ticket:970]
Diffstat (limited to 'test/sql/test_select.py')
| -rw-r--r-- | test/sql/test_select.py | 1550 |
1 files changed, 1550 insertions, 0 deletions
diff --git a/test/sql/test_select.py b/test/sql/test_select.py new file mode 100644 index 000000000..1d9e531de --- /dev/null +++ b/test/sql/test_select.py @@ -0,0 +1,1550 @@ +from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +import datetime, re, operator +from sqlalchemy import * +from sqlalchemy import exc, sql, util +from sqlalchemy.sql import table, column, label, compiler +from sqlalchemy.sql.expression import ClauseList +from sqlalchemy.engine import default +from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql +from sqlalchemy.test import * + +table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), +) + +table2 = table( + 'myothertable', + column('otherid', Integer), + column('othername', String), +) + +table3 = table( + 'thirdtable', + column('userid', Integer), + column('otherstuff', String), +) + +metadata = MetaData() +table4 = Table( + 'remotetable', metadata, + Column('rem_id', Integer, primary_key=True), + Column('datatype_id', Integer), + Column('value', String(20)), + schema = 'remote_owner' +) + +users = table('users', + column('user_id'), + column('user_name'), + column('password'), +) + +addresses = table('addresses', + column('address_id'), + column('user_id'), + column('street'), + column('city'), + column('state'), + column('zip') +) + +class SelectTest(TestBase, AssertsCompiledSQL): + + def test_attribute_sanity(self): + assert hasattr(table1, 'c') + assert hasattr(table1.select(), 'c') + assert not hasattr(table1.c.myid.self_group(), 'columns') + assert hasattr(table1.select().self_group(), 'columns') + assert not hasattr(select([table1.c.myid]).as_scalar().self_group(), 'columns') + assert not hasattr(table1.c.myid, 'columns') + assert not hasattr(table1.c.myid, 'c') + assert not hasattr(table1.select().c.myid, 'c') + assert not hasattr(table1.select().c.myid, 'columns') + assert not hasattr(table1.alias().c.myid, 'columns') + assert not hasattr(table1.alias().c.myid, 'c') + + def test_table_select(self): + self.assert_compile(table1.select(), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable") + + self.assert_compile(select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, \ +myothertable.othername FROM mytable, myothertable") + + def test_from_subquery(self): + """tests placing select statements in the column clause of another select, for the + purposes of selecting from the exported columns of that select.""" + + s = select([table1], table1.c.name == 'jack') + self.assert_compile( + select( + [s], + s.c.myid == 7 + ) + , + "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable "\ + "WHERE mytable.name = :name_1) WHERE myid = :myid_1") + + sq = select([table1]) + self.assert_compile( + sq.select(), + "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)" + ) + + sq = select( + [table1], + ).alias('sq') + + self.assert_compile( + sq.select(sq.c.myid == 7), + "SELECT sq.myid, sq.name, sq.description FROM \ +(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :myid_1" + ) + + sq = select( + [table1, table2], + and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid), + use_labels = True + ).alias('sq') + + sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \ +mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \ +myothertable.othername AS myothertable_othername FROM mytable, myothertable \ +WHERE mytable.myid = :myid_1 AND myothertable.otherid = mytable.myid" + + self.assert_compile(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \ +sq.myothertable_othername FROM (" + sqstring + ") AS sq") + + sq2 = select( + [sq], + use_labels = True + ).alias('sq2') + + self.assert_compile(sq2.select(), "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, sq2.sq_mytable_description, \ +sq2.sq_myothertable_otherid, sq2.sq_myothertable_othername FROM \ +(SELECT sq.mytable_myid AS sq_mytable_myid, sq.mytable_name AS sq_mytable_name, \ +sq.mytable_description AS sq_mytable_description, sq.myothertable_otherid AS sq_myothertable_otherid, \ +sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") AS sq) AS sq2") + + def test_select_from_clauselist(self): + self.assert_compile( + select([ClauseList(column('a'), column('b'))]).select_from('sometable'), + 'SELECT a, b FROM sometable' + ) + + def test_use_labels(self): + self.assert_compile( + select([table1.c.myid==5], use_labels=True), + "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable" + ) + + self.assert_compile( + select([func.foo()], use_labels=True), + "SELECT foo() AS foo_1" + ) + + self.assert_compile( + select([not_(True)], use_labels=True), + "SELECT NOT :param_1" # TODO: should this make an anon label ?? + ) + + self.assert_compile( + select([cast("data", sqlite.SLInteger)], use_labels=True), # this will work with plain Integer in 0.6 + "SELECT CAST(:param_1 AS INTEGER) AS anon_1" + ) + + + + def test_nested_uselabels(self): + """test nested anonymous label generation. this + essentially tests the ANONYMOUS_LABEL regex. + + """ + s1 = table1.select() + s2 = s1.alias() + s3 = select([s2], use_labels=True) + s4 = s3.alias() + s5 = select([s4], use_labels=True) + self.assert_compile(s5, "SELECT anon_1.anon_2_myid AS anon_1_anon_2_myid, anon_1.anon_2_name AS anon_1_anon_2_name, "\ + "anon_1.anon_2_description AS anon_1_anon_2_description FROM (SELECT anon_2.myid AS anon_2_myid, anon_2.name AS anon_2_name, "\ + "anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\ + "AS description FROM mytable) AS anon_2) AS anon_1") + + def test_dont_overcorrelate(self): + self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") + + def test_full_correlate(self): + # intentional + t = table('t', column('a'), column('b')) + s = select([t.c.a]).where(t.c.a==1).correlate(t).as_scalar() + + s2 = select([t.c.a, s]) + self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""") + + # unintentional + t2 = table('t2', column('c'), column('d')) + s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar() + s2 =select([t, t2, s]) + assert_raises(exc.InvalidRequestError, str, s2) + + # intentional again + s = s.correlate(t, t2) + s2 =select([t, t2, s]) + self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d") + + def test_exists(self): + self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", params={'mytable_myid':5}) + + self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) + + self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={}) + + self.assert_compile( + table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)" + ) + + self.assert_compile( + table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)" + ) + + self.assert_compile( + table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" + ) + + self.assert_compile( + table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)" + ) + + self.assert_compile( + select([ + or_( + exists().where(table2.c.otherid=='foo'), + exists().where(table2.c.otherid=='bar') + ) + ]), + "SELECT (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_1)) "\ + "OR (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_2)) AS anon_1" + ) + + + def test_where_subquery(self): + s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') + self.assert_compile( + select([users, s.c.street], from_obj=s), + """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") + + self.assert_compile( + table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :name_1)" + ) + + self.assert_compile( + table1.select(table1.c.myid == select([table2.c.otherid], table1.c.name == table2.c.othername)), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT myothertable.otherid FROM myothertable WHERE mytable.name = myothertable.othername)" + ) + + self.assert_compile( + table1.select(exists([1], table2.c.otherid == table1.c.myid)), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" + ) + + + talias = table1.alias('ta') + s = subquery('sq2', [talias], exists([1], table2.c.otherid == talias.c.myid)) + self.assert_compile( + select([s, table1]) + ,"SELECT sq2.myid, sq2.name, sq2.description, mytable.myid, mytable.name, mytable.description FROM (SELECT ta.myid AS myid, ta.name AS name, ta.description AS description FROM mytable AS ta WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = ta.myid)) AS sq2, mytable") + + s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s') + self.assert_compile( + select([users, s.c.street], from_obj=s), + """SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""") + + # test constructing the outer query via append_column(), which occurs in the ORM's Query object + s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=table1) + s.append_column(table1) + self.assert_compile( + s, + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)" + ) + + + def test_orderby_subquery(self): + self.assert_compile( + table1.select(order_by=[select([table2.c.otherid], table1.c.myid==table2.c.otherid)]), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid)" + ) + self.assert_compile( + table1.select(order_by=[desc(select([table2.c.otherid], table1.c.myid==table2.c.otherid))]), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC" + ) + + @testing.uses_deprecated('scalar option') + def test_scalar_select(self): + try: + s = select([table1.c.myid, table1.c.name]).as_scalar() + assert False + except exc.InvalidRequestError, err: + assert str(err) == "Scalar select can only be created from a Select object that has exactly one column expression.", str(err) + + try: + # generic function which will look at the type of expression + func.coalesce(select([table1.c.myid])) + assert False + except exc.InvalidRequestError, err: + assert str(err) == "Select objects don't have a type. Call as_scalar() on this Select object to return a 'scalar' version of this Select.", str(err) + + s = select([table1.c.myid], scalar=True, correlate=False) + self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") + + s = select([table1.c.myid], scalar=True) + self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") + + s = select([table1.c.myid]).correlate(None).as_scalar() + self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") + + # test that aliases use as_scalar() when used in an explicitly scalar context + s = select([table1.c.myid]).alias() + self.assert_compile(select([table1.c.myid]).where(table1.c.myid==s), "SELECT mytable.myid FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable)") + self.assert_compile(select([table1.c.myid]).where(s > table1.c.myid), "SELECT mytable.myid FROM mytable WHERE mytable.myid < (SELECT mytable.myid FROM mytable)") + + + s = select([table1.c.myid]).as_scalar() + self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") + + # test expressions against scalar selects + self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_1 AS anon_1") + self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_1 AS anon_1") + self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_1 AS anon_1") + + self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") + + # scalar selects should not have any attributes on their 'c' or 'columns' attribute + s = select([table1.c.myid]).as_scalar() + try: + s.c.foo + except exc.InvalidRequestError, err: + assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' + + try: + s.columns.foo + except exc.InvalidRequestError, err: + assert str(err) == 'Scalar Select expression has no columns; use this object directly within a column-level expression.' + + zips = table('zips', + column('zipcode'), + column('latitude'), + column('longitude'), + ) + places = table('places', + column('id'), + column('nm') + ) + zip = '12345' + qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar() + qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar() + + q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')], + zips.c.zipcode==zip, + order_by = ['dist', places.c.nm] + ) + + self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " + "zips.zipcode = :zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zipcode_2)) AS dist " + "FROM places, zips WHERE zips.zipcode = :zipcode_3 ORDER BY dist, places.nm") + + zalias = zips.alias('main_zip') + qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) + qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) + q = select([places.c.id, places.c.nm, zalias.c.zipcode, func.latlondist(qlat, qlng).label('dist')], + order_by = ['dist', places.c.nm] + ) + self.assert_compile(q, "SELECT places.id, places.nm, main_zip.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE zips.zipcode = main_zip.zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = main_zip.zipcode)) AS dist FROM places, zips AS main_zip ORDER BY dist, places.nm") + + a1 = table2.alias('t2alias') + s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True) + j1 = table1.join(table2, table1.c.myid==table2.c.otherid) + s2 = select([table1, s1], from_obj=j1) + self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + + def test_label_comparison(self): + x = func.lala(table1.c.myid).label('foo') + self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1") + + self.assert_compile(label('bar', column('foo', type_=String)) + "foo", "foo || :param_1") + + + def test_conjunctions(self): + a, b, c = 'a', 'b', 'c' + x = and_(a, b, c) + assert isinstance(x.type, Boolean) + assert str(x) == 'a AND b AND c' + self.assert_compile( + select([x.label('foo')]), + 'SELECT a AND b AND c AS foo' + ) + + self.assert_compile( + and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"), + "mytable.myid = :myid_1 AND mytable.name = :name_1 "\ + "AND myothertable.othername = :othername_1 AND sysdate() = today()" + ) + + self.assert_compile( + and_( + table1.c.myid == 12, + or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), + "sysdate() = today()", + ), + "mytable.myid = :myid_1 AND (myothertable.othername = :othername_1 OR "\ + "myothertable.othername = :othername_2 OR myothertable.otherid = :otherid_1) AND sysdate() = today()", + checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12} + ) + + + def test_distinct(self): + self.assert_compile( + select([table1.c.myid.distinct()]), "SELECT DISTINCT mytable.myid FROM mytable" + ) + + self.assert_compile( + select([distinct(table1.c.myid)]), "SELECT DISTINCT mytable.myid FROM mytable" + ) + + self.assert_compile( + select([table1.c.myid]).distinct(), "SELECT DISTINCT mytable.myid FROM mytable" + ) + + self.assert_compile( + select([func.count(table1.c.myid.distinct())]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" + ) + + self.assert_compile( + select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" + ) + + def test_operators(self): + for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), + (operator.sub, '-'), (operator.div, '/'), + ): + for (lhs, rhs, res) in ( + (5, table1.c.myid, ':myid_1 %s mytable.myid'), + (5, literal(5), ':param_1 %s :param_2'), + (table1.c.myid, 'b', 'mytable.myid %s :myid_1'), + (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'), + (table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'), + (literal(5), 8, ':param_1 %s :param_2'), + (literal(6), table1.c.myid, ':param_1 %s mytable.myid'), + (literal(7), literal(5.5), ':param_1 %s :param_2'), + ): + self.assert_compile(py_op(lhs, rhs), res % sql_op) + + dt = datetime.datetime.today() + # exercise comparison operators + for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'), + (operator.gt, '>', '<'), + (operator.eq, '=', '='), + (operator.ne, '!=', '!='), + (operator.le, '<=', '>='), + (operator.ge, '>=', '<=')): + for (lhs, rhs, l_sql, r_sql) in ( + ('a', table1.c.myid, ':myid_1', 'mytable.myid'), + ('a', literal('b'), ':param_2', ':param_1'), # note swap! + (table1.c.myid, 'b', 'mytable.myid', ':myid_1'), + (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'), + (table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'), + (literal('a'), 'b', ':param_1', ':param_2'), + (literal('a'), table1.c.myid, ':param_1', 'mytable.myid'), + (literal('a'), literal('b'), ':param_1', ':param_2'), + (dt, literal('b'), ':param_2', ':param_1'), + (literal('b'), dt, ':param_1', ':param_2'), + ): + + # the compiled clause should match either (e.g.): + # 'a' < 'b' -or- 'b' > 'a'. + compiled = str(py_op(lhs, rhs)) + fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql) + rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql) + + self.assert_(compiled == fwd_sql or compiled == rev_sql, + "\n'" + compiled + "'\n does not match\n'" + + fwd_sql + "'\n or\n'" + rev_sql + "'") + + self.assert_compile( + table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1" + ) + + self.assert_compile( + table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND "\ + "NOT (mytable.name BETWEEN :name_1 AND :name_2)" + ) + + self.assert_compile( + table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND "\ + "NOT (mytable.name = :name_1 AND mytable.name = :name_2 AND mytable.name = :name_3)" + ) + + self.assert_compile( + table1.select((table1.c.myid != 12) & ~table1.c.name), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :myid_1 AND NOT mytable.name" + ) + + self.assert_compile( + literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3" + ) + + # test the op() function, also that its results are further usable in expressions + self.assert_compile( + table1.select(table1.c.myid.op('hoho')(12)==14), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :myid_1) = :param_1" + ) + + # test that clauses can be pickled (operators need to be module-level, etc.) + clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & table1.c.myid.like('hoho') + assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause))) + + + def test_like(self): + for expr, check, dialect in [ + (table1.c.myid.like('somstr'), "mytable.myid LIKE :myid_1", None), + (~table1.c.myid.like('somstr'), "mytable.myid NOT LIKE :myid_1", None), + (table1.c.myid.like('somstr', escape='\\'), "mytable.myid LIKE :myid_1 ESCAPE '\\'", None), + (~table1.c.myid.like('somstr', escape='\\'), "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'", None), + (table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'", None), + (~table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'", None), + (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()), + (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()), + (table1.c.name.ilike('%something%'), "lower(mytable.name) LIKE lower(:name_1)", None), + (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgres.PGDialect()), + (~table1.c.name.ilike('%something%'), "lower(mytable.name) NOT LIKE lower(:name_1)", None), + (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgres.PGDialect()), + ]: + self.assert_compile(expr, check, dialect=dialect) + + def test_match(self): + for expr, check, dialect in [ + (table1.c.myid.match('somstr'), "mytable.myid MATCH ?", sqlite.SQLiteDialect()), + (table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.MySQLDialect()), + (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", mssql.MSSQLDialect()), + (table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgres.PGDialect()), + (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.OracleDialect()), + ]: + self.assert_compile(expr, check, dialect=dialect) + + def test_composed_string_comparators(self): + self.assert_compile( + table1.c.name.contains('jo'), "mytable.name LIKE '%%' || :name_1 || '%%'" , checkparams = {'name_1': u'jo'}, + ) + self.assert_compile( + table1.c.name.contains('jo'), "mytable.name LIKE concat(concat('%%', %s), '%%')" , checkparams = {'name_1': u'jo'}, + dialect=mysql.dialect() + ) + self.assert_compile( + table1.c.name.contains('jo', escape='\\'), "mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" , checkparams = {'name_1': u'jo'}, + ) + self.assert_compile( table1.c.name.startswith('jo', escape='\\'), "mytable.name LIKE :name_1 || '%%' ESCAPE '\\'" ) + self.assert_compile( table1.c.name.endswith('jo', escape='\\'), "mytable.name LIKE '%%' || :name_1 ESCAPE '\\'" ) + self.assert_compile( table1.c.name.endswith('hn'), "mytable.name LIKE '%%' || :name_1", checkparams = {'name_1': u'hn'}, ) + self.assert_compile( + table1.c.name.endswith('hn'), "mytable.name LIKE concat('%%', %s)", + checkparams = {'name_1': u'hn'}, dialect=mysql.dialect() + ) + self.assert_compile( + table1.c.name.startswith(u"hi \xf6 \xf5"), "mytable.name LIKE :name_1 || '%%'", + checkparams = {'name_1': u'hi \xf6 \xf5'}, + ) + self.assert_compile(column('name').endswith(text("'foo'")), "name LIKE '%%' || 'foo'" ) + self.assert_compile(column('name').endswith(literal_column("'foo'")), "name LIKE '%%' || 'foo'" ) + self.assert_compile(column('name').startswith(text("'foo'")), "name LIKE 'foo' || '%%'" ) + self.assert_compile(column('name').startswith(text("'foo'")), "name LIKE concat('foo', '%%')", dialect=mysql.dialect()) + self.assert_compile(column('name').startswith(literal_column("'foo'")), "name LIKE 'foo' || '%%'" ) + self.assert_compile(column('name').startswith(literal_column("'foo'")), "name LIKE concat('foo', '%%')", dialect=mysql.dialect()) + + def test_multiple_col_binds(self): + self.assert_compile( + select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')), + "SELECT * FROM mytable WHERE mytable.myid = :myid_1 OR mytable.myid = :myid_2 OR mytable.myid = :myid_3" + ) + + def test_orderby_groupby(self): + self.assert_compile( + table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]), + "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC" + ) + + self.assert_compile( + table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]), + "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" + ) + + # generative order_by + self.assert_compile( + table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()), + "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC" + ) + + self.assert_compile( + table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None), + "SELECT myothertable.otherid, myothertable.othername FROM myothertable" + ) + + self.assert_compile( + select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername" + ) + + # generative group by + self.assert_compile( + select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername" + ) + + self.assert_compile( + select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable" + ) + + self.assert_compile( + select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]), + "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername" + ) + + def test_for_update(self): + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect()) + + self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect()) + + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect()) + + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", dialect=oracle.dialect()) + + def test_alias(self): + # test the alias for a table1. column names stay the same, table name "changes" to "foo". + self.assert_compile( + select([table1.alias('foo')]) + ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo") + + for dialect in (firebird.dialect(), oracle.dialect()): + self.assert_compile( + select([table1.alias('foo')]) + ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo" + ,dialect=dialect) + + self.assert_compile( + select([table1.alias()]) + ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1") + + # create a select for a join of two tables. use_labels means the column names will have + # labels tablename_columnname, which become the column keys accessible off the Selectable object. + # also, only use one column from the second table and all columns from the first table1. + q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True) + + # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view". + a = alias(q, 't2view') + + # select from that alias, also using labels. two levels of labels should produce two underscores. + # also, reference the column "mytable_myid" off of the t2view alias. + self.assert_compile( + a.select(a.c.mytable_myid == 9, use_labels = True), + "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \ +t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \ +(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \ +myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \ +WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1" + ) + + + def test_prefixes(self): + self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), + "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable" + ) + + def test_text(self): + self.assert_compile( + text("select * from foo where lala = bar") , + "select * from foo where lala = bar" + ) + + # test bytestring + self.assert_compile(select( + ["foobar(a)", "pk_foo_bar(syslaal)"], + "a = 12", + from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"] + ), + "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") + + # test unicode + self.assert_compile(select( + [u"foobar(a)", u"pk_foo_bar(syslaal)"], + u"a = 12", + from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"] + ), + u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12") + + # test building a select query programmatically with text + s = select() + s.append_column("column1") + s.append_column("column2") + s.append_whereclause("column1=12") + s.append_whereclause("column2=19") + s = s.order_by("column1") + s.append_from("table1") + self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1") + + self.assert_compile( + select(["column1", "column2"], from_obj=table1).alias('somealias').select(), + "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias" + ) + + # test that use_labels doesnt interfere with literal columns + self.assert_compile( + select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True), + "SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable" + ) + + # test that use_labels doesnt interfere with literal columns that have textual labels + self.assert_compile( + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True), + "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable" + ) + + print "---------------------------------------------" + s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]) + print "---------------------------------------------" + # test that "auto-labeling of subquery columns" doesnt interfere with literal columns, + # exported columns dont get quoted + self.assert_compile( + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(), + "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)" + ) + + self.assert_compile( + select(['col1','col2'], from_obj='tablename').alias('myalias'), + "SELECT col1, col2 FROM tablename" + ) + + def test_binds_in_text(self): + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar':4, 'whee': 7}, + ) + + self.assert_compile( + text("select * from foo where clock='05:06:07'"), + "select * from foo where clock='05:06:07'", + checkparams={}, + params={}, + ) + + dialect = postgres.dialect() + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), + "select * from foo where lala=%(bar)s and hoho=%(whee)s", + checkparams={'bar':4, 'whee': 7}, + dialect=dialect + ) + + # test escaping out text() params with a backslash + self.assert_compile( + text("select * from foo where clock='05:06:07' and mork='\:mindy'"), + "select * from foo where clock='05:06:07' and mork=':mindy'", + checkparams={}, + params={}, + dialect=dialect + ) + + dialect = sqlite.dialect() + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), + "select * from foo where lala=? and hoho=?", + checkparams={'bar':4, 'whee':7}, + dialect=dialect + ) + + self.assert_compile(select( + [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], + and_( + "foo.id = foofoo(lala)", + "datetime(foo) = Today", + table1.c.myid == table2.c.otherid, + ) + ), + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \ +FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid") + + self.assert_compile(select( + [alias(table1, 't'), "foo.f"], + "foo.f = t.id", + from_obj = ["(select f from bar where lala=heyhey) foo"] + ), + "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id") + + # test Text embedded within select_from(), using binds + generate_series = text("generate_series(:x, :y, :z) as s(a)", bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]) + + s =select([(func.current_date() + literal_column("s.a")).label("dates")]).select_from(generate_series) + self.assert_compile(s, "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': None, 'x': None, 'z': None}) + + self.assert_compile(s.params(x=5, y=6, z=7), "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': 6, 'x': 5, 'z': 7}) + + + def test_literal(self): + + self.assert_compile(select([literal('foo')]), "SELECT :param_1") + + self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), + "SELECT :param_1 || :param_2 AS anon_1 FROM mytable") + + def test_calculated_columns(self): + value_tbl = table('values', + column('id', Integer), + column('val1', Float), + column('val2', Float), + ) + + self.assert_compile( + select([value_tbl.c.id, (value_tbl.c.val2 - + value_tbl.c.val1)/value_tbl.c.val1]), + "SELECT values.id, (values.val2 - values.val1) / values.val1 AS anon_1 FROM values" + ) + + self.assert_compile( + select([value_tbl.c.id], (value_tbl.c.val2 - + value_tbl.c.val1)/value_tbl.c.val1 > 2.0), + "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1" + ) + + self.assert_compile( + select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0), + "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1" + ) + + def test_collate(self): + for expr in (select([table1.c.name.collate('latin1_german2_ci')]), + select([collate(table1.c.name, 'latin1_german2_ci')])): + self.assert_compile( + expr, "SELECT mytable.name COLLATE latin1_german2_ci AS anon_1 FROM mytable") + + assert table1.c.name.collate('latin1_german2_ci').type is table1.c.name.type + + expr = select([table1.c.name.collate('latin1_german2_ci').label('k1')]).order_by('k1') + self.assert_compile(expr,"SELECT mytable.name COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1") + + expr = select([collate('foo', 'latin1_german2_ci').label('k1')]) + self.assert_compile(expr,"SELECT :param_1 COLLATE latin1_german2_ci AS k1") + + expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')]) + self.assert_compile(expr, + "SELECT mytable.name COLLATE latin1_german2_ci " + "LIKE :param_1 AS anon_1 FROM mytable") + + expr = select([table1.c.name.like(collate('%x%', 'latin1_german2_ci'))]) + self.assert_compile(expr, + "SELECT mytable.name " + "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 " + "FROM mytable") + + expr = select([table1.c.name.collate('col1').like( + collate('%x%', 'col2'))]) + self.assert_compile(expr, + "SELECT mytable.name COLLATE col1 " + "LIKE :param_1 COLLATE col2 AS anon_1 " + "FROM mytable") + + expr = select([func.concat('a', 'b').collate('latin1_german2_ci').label('x')]) + self.assert_compile(expr, + "SELECT concat(:param_1, :param_2) " + "COLLATE latin1_german2_ci AS x") + + + expr = select([table1.c.name]).order_by(table1.c.name.collate('latin1_german2_ci')) + self.assert_compile(expr, "SELECT mytable.name FROM mytable ORDER BY mytable.name COLLATE latin1_german2_ci") + + def test_percent_chars(self): + t = table("table%name", + column("percent%"), + column("%(oneofthese)s"), + column("spaces % more spaces"), + ) + self.assert_compile( + t.select(use_labels=True), + '''SELECT "table%name"."percent%" AS "table%name_percent%", '''\ + '''"table%name"."%(oneofthese)s" AS "table%name_%(oneofthese)s", '''\ + '''"table%name"."spaces % more spaces" AS "table%name_spaces % more spaces" FROM "table%name"''' + ) + + + def test_joins(self): + self.assert_compile( + join(table2, table1, table1.c.myid == table2.c.otherid).select(), + "SELECT myothertable.otherid, myothertable.othername, mytable.myid, mytable.name, \ +mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertable.otherid" + ) + + self.assert_compile( + select( + [table1], + from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid)] + ), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + + self.assert_compile( + select( + [join(join(table1, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid == table3.c.userid)] + ), + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid" + ) + + self.assert_compile( + join(users, addresses, users.c.user_id==addresses.c.user_id).select(), + "SELECT users.user_id, users.user_name, users.password, addresses.address_id, addresses.user_id, addresses.street, addresses.city, addresses.state, addresses.zip FROM users JOIN addresses ON users.user_id = addresses.user_id" + ) + + self.assert_compile( + select([table1, table2, table3], + + from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).outerjoin(table3, table1.c.myid==table3.c.userid)] + + #from_obj = [outerjoin(join(table, table2, table1.c.myid == table2.c.otherid), table3, table1.c.myid==table3.c.userid)] + ) + ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid = thirdtable.userid" + ) + self.assert_compile( + select([table1, table2, table3], + from_obj = [outerjoin(table1, join(table2, table3, table2.c.otherid == table3.c.userid), table1.c.myid==table2.c.otherid)] + ) + ,"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON mytable.myid = myothertable.otherid" + ) + + query = select( + [table1, table2], + or_( + table1.c.name == 'fred', + table1.c.myid == 10, + table2.c.othername != 'jack', + "EXISTS (select yay from foo where boo = lar)" + ), + from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ] + ) + self.assert_compile(query, + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ +FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \ +WHERE mytable.name = :name_1 OR mytable.myid = :myid_1 OR \ +myothertable.othername != :othername_1 OR \ +EXISTS (select yay from foo where boo = lar)", + ) + + def test_compound_selects(self): + try: + union(table3.select(), table1.select()) + except exc.ArgumentError, err: + assert str(err) == "All selectables passed to CompoundSelect must have identical numbers of columns; select #1 has 2 columns, select #2 has 3" + + x = union( + select([table1], table1.c.myid == 5), + select([table1], table1.c.myid == 12), + order_by = [table1.c.myid], + ) + + self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable WHERE mytable.myid = :myid_1 UNION \ +SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid") + + u1 = union( + select([table1.c.myid, table1.c.name]), + select([table2]), + select([table3]) + ) + self.assert_compile(u1, + "SELECT mytable.myid, mytable.name \ +FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ +FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable") + + assert u1.corresponding_column(table2.c.otherid) is u1.c.myid + + # TODO - why is there an extra space before the LIMIT ? + self.assert_compile( + union( + select([table1.c.myid, table1.c.name]), + select([table2]), + order_by=['myid'], + offset=10, + limit=5 + ) + , "SELECT mytable.myid, mytable.name \ +FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ +FROM myothertable ORDER BY myid LIMIT 5 OFFSET 10" + ) + + self.assert_compile( + union( + select([table1.c.myid, table1.c.name, func.max(table1.c.description)], table1.c.name=='name2', group_by=[table1.c.myid, table1.c.name]), + table1.select(table1.c.name=='name1') + ) + , + "SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 FROM mytable \ +WHERE mytable.name = :name_1 GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable WHERE mytable.name = :name_2" + ) + + self.assert_compile( + union( + select([literal(100).label('value')]), + select([literal(200).label('value')]) + ), + "SELECT :param_1 AS value UNION SELECT :param_2 AS value" + ) + + self.assert_compile( + union_all( + select([table1.c.myid]), + union( + select([table2.c.otherid]), + select([table3.c.userid]), + ) + ) + , + "SELECT mytable.myid FROM mytable UNION ALL (SELECT myothertable.otherid FROM myothertable UNION \ +SELECT thirdtable.userid FROM thirdtable)" + ) + # This doesn't need grouping, so don't group to not give sqlite unnecessarily hard time + self.assert_compile( + union( + except_( + select([table2.c.otherid]), + select([table3.c.userid]), + ), + select([table1.c.myid]) + ) + , + "SELECT myothertable.otherid FROM myothertable EXCEPT SELECT thirdtable.userid FROM thirdtable \ +UNION SELECT mytable.myid FROM mytable" + ) + + s = select([column('foo'), column('bar')]) + s = union(s, s) + s = union(s, s) + self.assert_compile(s, "SELECT foo, bar UNION SELECT foo, bar UNION (SELECT foo, bar UNION SELECT foo, bar)") + + s = select([column('foo'), column('bar')]) + # ORDER BY's even though not supported by all DB's, are rendered if requested + self.assert_compile(union(s.order_by("foo"), s.order_by("bar")), + "SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar" + ) + # self_group() is honored + self.assert_compile(union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()), + "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT 10)" + ) + + + @testing.uses_deprecated() + def test_binds(self): + for ( + stmt, + expected_named_stmt, + expected_positional_stmt, + expected_default_params_dict, + expected_default_params_list, + test_param_dict, + expected_test_params_dict, + expected_test_params_list + ) in [ + ( + select( + [table1, table2], + and_( + table1.c.myid == table2.c.otherid, + table1.c.name == bindparam('mytablename') + )), + """SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = :mytablename""", + """SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid AND mytable.name = ?""", + {'mytablename':None}, [None], + {'mytablename':5}, {'mytablename':5}, [5] + ), + ( + select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myid'))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", + {'myid':None}, [None, None], + {'myid':5}, {'myid':5}, [5,5] + ), + ( + text("SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid"), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", + {'myid':None}, [None, None], + {'myid':5}, {'myid':5}, [5,5] + ), + ( + select([table1], or_(table1.c.myid==bindparam('myid', unique=True), table2.c.otherid==bindparam('myid', unique=True))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", + {'myid_1':None, 'myid_2':None}, [None, None], + {'myid_1':5, 'myid_2': 6}, {'myid_1':5, 'myid_2':6}, [5,6] + ), + ( + bindparam('test', type_=String) + text("'hi'"), + ":test || 'hi'", + "? || 'hi'", + {'test':None}, [None], + {}, {'test':None}, [None] + ), + ( + select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid'))).params({'myid':8, 'myotherid':7}), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myotherid", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", + {'myid':8, 'myotherid':7}, [8, 7], + {'myid':5}, {'myid':5, 'myotherid':7}, [5,7] + ), + ( + select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True), table2.c.otherid==bindparam('myid', value=8, unique=True))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", + {'myid_1':7, 'myid_2':8}, [7,8], + {'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6] + ), + ]: + + self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict) + self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect()) + nonpositional = stmt.compile() + positional = stmt.compile(dialect=sqlite.dialect()) + pp = positional.get_params() + assert [pp[k] for k in positional.positiontup] == expected_default_params_list + assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict))) + pp = positional.get_params(**test_param_dict) + assert [pp[k] for k in positional.positiontup] == expected_test_params_list + + # check that params() doesnt modify original statement + s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid'))) + s2 = s.params({'myid':8, 'myotherid':7}) + s3 = s2.params({'myid':9}) + assert s.compile().params == {'myid':None, 'myotherid':None} + assert s2.compile().params == {'myid':8, 'myotherid':7} + assert s3.compile().params == {'myid':9, 'myotherid':7} + + # test using same 'unique' param object twice in one compile + s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar() + s2 = select([table1, s], table1.c.myid==s) + self.assert_compile(s2, + "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable WHERE mytable.myid = "\ + ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)") + positional = s2.compile(dialect=sqlite.dialect()) + + pp = positional.get_params() + assert [pp[k] for k in positional.positiontup] == [12, 12] + + # check that conflicts with "unique" params are caught + s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('myid_1'))) + assert_raises_message(exc.CompileError, "conflicts with unique bind parameter of the same name", str, s) + + s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('myid_1'))) + assert_raises_message(exc.CompileError, "conflicts with unique bind parameter of the same name", str, s) + + def test_binds_no_hash_collision(self): + """test that construct_params doesn't corrupt dict due to hash collisions""" + + total_params = 100000 + + in_clause = [':in%d' % i for i in range(total_params)] + params = dict(('in%d' % i, i) for i in range(total_params)) + sql = 'text clause %s' % ', '.join(in_clause) + t = text(sql) + assert len(t.bindparams) == total_params + c = t.compile() + pp = c.construct_params(params) + assert len(set(pp)) == total_params + assert len(set(pp.values())) == total_params + + + def test_bind_as_col(self): + t = table('foo', column('id')) + + s = select([t, literal('lala').label('hoho')]) + self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") + + assert [str(c) for c in s.c] == ["id", "hoho"] + + def test_in(self): + self.assert_compile(select([table1], table1.c.myid.in_(['a'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)") + + self.assert_compile(select([table1], ~table1.c.myid.in_(['a'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:myid_1)") + + self.assert_compile(select([table1], table1.c.myid.in_(['a', 'b'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2)") + + self.assert_compile(select([table1], table1.c.myid.in_(iter(['a', 'b']))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :myid_1)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)") + + self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :param_1)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :myid_1)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)") + + self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :param_1 + :param_2)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)") + + self.assert_compile(select([table1], table1.c.myid.in_([table1.c.myid])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)") + + self.assert_compile(select([table1], table1.c.myid.in_(['a', table1.c.myid])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, mytable.myid)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :myid_1)") + + self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :myid_1 + mytable.myid)") + + self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1, :myid_2, :myid_3)") + + self.assert_compile(select([table1], table1.c.myid.in_(select([table2.c.otherid]))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid FROM myothertable)") + + self.assert_compile(select([table1], ~table1.c.myid.in_(select([table2.c.otherid]))), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)") + + self.assert_compile(select([table1], table1.c.myid.in_( + union( + select([table1.c.myid], table1.c.myid == 5), + select([table1.c.myid], table1.c.myid == 12), + ) + )), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable \ +WHERE mytable.myid IN (\ +SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1 \ +UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)") + + # test that putting a select in an IN clause does not blow away its ORDER BY clause + self.assert_compile( + select([table1, table2], + table2.c.otherid.in_( + select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False) + ), + from_obj=[table1.join(table2, table1.c.myid==table2.c.otherid)], order_by=[table1.c.myid] + ), + "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable "\ + "JOIN myothertable ON mytable.myid = myothertable.otherid WHERE myothertable.otherid IN (SELECT myothertable.otherid "\ + "FROM myothertable ORDER BY myothertable.othername LIMIT 10) ORDER BY mytable.myid" + ) + + # test empty in clause + self.assert_compile(select([table1], table1.c.myid.in_([])), + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != mytable.myid") + + self.assert_compile( + select([table1.c.myid.in_(select([table2.c.otherid]))]), + "SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable" + ) + self.assert_compile( + select([table1.c.myid.in_(select([table2.c.otherid]).as_scalar())]), + "SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable" + ) + + def test_cast(self): + tbl = table('casttest', + column('id', Integer), + column('v1', Float), + column('v2', Float), + column('ts', TIMESTAMP), + ) + + def check_results(dialect, expected_results, literal): + eq_(len(expected_results), 5, 'Incorrect number of expected results') + eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0]) + eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[1]) + eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2]) + eq_(str(cast(1234, TEXT).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3])) + eq_(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4])) + # fixme: shoving all of this dialect-specific stuff in one test + # is now officialy completely ridiculous AND non-obviously omits + # coverage on other dialects. + sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect) + if isinstance(dialect, type(mysql.dialect())): + eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) AS anon_1 \nFROM casttest") + else: + eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest") + + # first test with Postgres engine + check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s') + + # then the Oracle engine + check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1') + + # then the sqlite engine + check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?') + + # then the MySQL engine + check_results(mysql.dialect(), ['DECIMAL(10, 2)', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s') + + self.assert_compile(cast(text('NULL'), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect()) + self.assert_compile(cast(null(), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect()) + self.assert_compile(cast(literal_column('NULL'), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect()) + + def test_date_between(self): + import datetime + table = Table('dt', metadata, + Column('date', Date)) + self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), + "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={'date_1':datetime.date(2006,6,1), 'date_2':datetime.date(2006,6,5)}) + + self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), + "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)}) + + def test_operator_precedence(self): + table = Table('op', metadata, + Column('field', Integer)) + self.assert_compile(table.select((table.c.field == 5) == None), + "SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL") + self.assert_compile(table.select((table.c.field + 5) == table.c.field), + "SELECT op.field FROM op WHERE op.field + :field_1 = op.field") + self.assert_compile(table.select((table.c.field + 5) * 6), + "SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1") + self.assert_compile(table.select((table.c.field * 5) + 6), + "SELECT op.field FROM op WHERE op.field * :field_1 + :param_1") + self.assert_compile(table.select(5 + table.c.field.in_([5,6])), + "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:field_1, :field_2))") + self.assert_compile(table.select((5 + table.c.field).in_([5,6])), + "SELECT op.field FROM op WHERE :field_1 + op.field IN (:param_1, :param_2)") + self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))), + "SELECT op.field FROM op WHERE NOT (op.field = :field_1 AND op.field = :field_2)") + self.assert_compile(table.select(not_(table.c.field == 5)), + "SELECT op.field FROM op WHERE op.field != :field_1") + self.assert_compile(table.select(not_(table.c.field.between(5, 6))), + "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :field_1 AND :field_2)") + self.assert_compile(table.select(not_(table.c.field) == 5), + "SELECT op.field FROM op WHERE (NOT op.field) = :param_1") + self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)), + "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") + self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)), + "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") + + def test_naming(self): + s1 = select([table1.c.myid, table1.c.myid.label('foobar'), func.hoho(table1.c.name), func.lala(table1.c.name).label('gg')]) + assert s1.c.keys() == ['myid', 'foobar', 'hoho(mytable.name)', 'gg'] + + from sqlalchemy.databases.sqlite import SLNumeric + meta = MetaData() + t1 = Table('mytable', meta, Column('col1', Integer)) + + for col, key, expr, label in ( + (table1.c.name, 'name', 'mytable.name', None), + (table1.c.myid==12, 'mytable.myid = :myid_1', 'mytable.myid = :myid_1', 'anon_1'), + (func.hoho(table1.c.myid), 'hoho(mytable.myid)', 'hoho(mytable.myid)', 'hoho_1'), + (cast(table1.c.name, SLNumeric), 'CAST(mytable.name AS NUMERIC(10, 2))', 'CAST(mytable.name AS NUMERIC(10, 2))', 'anon_1'), + (t1.c.col1, 'col1', 'mytable.col1', None), + (column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '') + ): + s1 = select([col], from_obj=getattr(col, 'table', None) or table1) + assert s1.c.keys() == [key], s1.c.keys() + + if label: + self.assert_compile(s1, "SELECT %s AS %s FROM mytable" % (expr, label)) + else: + self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,)) + + s1 = select([s1]) + if label: + self.assert_compile(s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % (label, expr, label)) + elif col.table is not None: + # sqlite rule labels subquery columns + self.assert_compile(s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % (key,expr, key)) + else: + self.assert_compile(s1, "SELECT %s FROM (SELECT %s FROM mytable)" % (expr,expr)) + +class CRUDTest(TestBase, AssertsCompiledSQL): + def test_insert(self): + # generic insert, will create bind params for all columns + self.assert_compile(insert(table1), "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)") + + # insert with user-supplied bind params for specific columns, + # cols provided literally + self.assert_compile( + insert(table1, {table1.c.myid : bindparam('userid'), table1.c.name : bindparam('username')}), + "INSERT INTO mytable (myid, name) VALUES (:userid, :username)") + + # insert with user-supplied bind params for specific columns, cols + # provided as strings + self.assert_compile( + insert(table1, dict(myid = 3, name = 'jack')), + "INSERT INTO mytable (myid, name) VALUES (:myid, :name)" + ) + + # test with a tuple of params instead of named + self.assert_compile( + insert(table1, (3, 'jack', 'mydescription')), + "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)", + checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'} + ) + + self.assert_compile( + insert(table1, values={table1.c.myid : bindparam('userid')}).values({table1.c.name : bindparam('username')}), + "INSERT INTO mytable (myid, name) VALUES (:userid, :username)" + ) + + self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())") + + def test_inline_insert(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=func.foobar())) + self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())") + self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={}) + + def test_update(self): + self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", params = {table1.c.name:'fred'}) + self.assert_compile(table1.update().where(table1.c.myid==7).values({table1.c.myid:5}), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", checkparams={'myid':5, 'myid_1':7}) + self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1", params = {'name':'fred'}) + self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid") + self.assert_compile(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}, checkparams={'crit':'notthere', 'name':'hi'}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :myid_1", params = {'description':'test'}, checkparams={'description':'test', 'myid_1':12}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :myid_1", params = {'myid_1': 12, 'myid': 9, 'description': 'test'}) + self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1", params={'myid':18}, checkparams={'myid':18, 'myid_1':12}) + s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'}) + c = s.compile(column_keys=['id', 'name']) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :name_1), description=:description WHERE mytable.myid = :myid_1", params = {'description':'test'}) + self.assert_(str(s) == str(c)) + + self.assert_compile(update(table1, + (table1.c.myid == func.hoho(4)) & + (table1.c.name == literal('foo') + table1.c.name + literal('lala')), + values = { + table1.c.name : table1.c.name + "lala", + table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho')) + }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :name_1) " + "WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || mytable.name || :param_3") + + def test_correlated_update(self): + # test against a straight text subquery + u = update(table1, values = {table1.c.name : text("(select name from mytable where id=mytable.id)")}) + self.assert_compile(u, "UPDATE mytable SET name=(select name from mytable where id=mytable.id)") + + mt = table1.alias() + u = update(table1, values = {table1.c.name : select([mt.c.name], mt.c.myid==table1.c.myid)}) + self.assert_compile(u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)") + + # test against a regular constructed subquery + s = select([table2], table2.c.otherid == table1.c.myid) + u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s}) + self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1") + + # test a non-correlated WHERE clause + s = select([table2.c.othername], table2.c.otherid == 7) + u = update(table1, table1.c.name==s) + self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\ + "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)") + + # test one that is actually correlated... + s = select([table2.c.othername], table2.c.otherid == table1.c.myid) + u = table1.update(table1.c.name==s) + self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\ + "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") + + def test_delete(self): + self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :myid_1") + self.assert_compile(table1.delete().where(table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :myid_1") + self.assert_compile(table1.delete().where(table1.c.myid == 7).where(table1.c.name=='somename'), "DELETE FROM mytable WHERE mytable.myid = :myid_1 AND mytable.name = :name_1") + + def test_correlated_delete(self): + # test a non-correlated WHERE clause + s = select([table2.c.othername], table2.c.otherid == 7) + u = delete(table1, table1.c.name==s) + self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\ + "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)") + + # test one that is actually correlated... + s = select([table2.c.othername], table2.c.otherid == table1.c.myid) + u = table1.delete(table1.c.name==s) + self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") + +class InlineDefaultTest(TestBase, AssertsCompiledSQL): + def test_insert(self): + m = MetaData() + foo = Table('foo', m, + Column('id', Integer)) + + t = Table('test', m, + Column('col1', Integer, default=func.foo(1)), + Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])), + ) + + self.assert_compile(t.insert(inline=True, values={}), "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), (SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo))") + + def test_update(self): + m = MetaData() + foo = Table('foo', m, + Column('id', Integer)) + + t = Table('test', m, + Column('col1', Integer, onupdate=func.foo(1)), + Column('col2', Integer, onupdate=select([func.coalesce(func.max(foo.c.id))])), + Column('col3', String(30)) + ) + + self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo), col3=:col3") + +class SchemaTest(TestBase, AssertsCompiledSQL): + def test_select(self): + # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables + self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable") + self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), + "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "\ + "remote_owner.remotetable.datatype_id = :datatype_id_1 AND remote_owner.remotetable.value = :value_1") + + s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')) + s.use_labels = True + self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "\ + "AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "\ + "remote_owner.remotetable.datatype_id = :datatype_id_1 AND remote_owner.remotetable.value = :value_1") + + def test_alias(self): + a = alias(table4, 'remtable') + self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable "\ + "WHERE remtable.datatype_id = :datatype_id_1") + + def test_update(self): + self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\ + "WHERE remote_owner.remotetable.value = :value_1") + + def test_insert(self): + self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\ + "(:rem_id, :datatype_id, :value)") + |
