diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-06-10 21:18:24 +0000 |
| commit | 45cec095b4904ba71425d2fe18c143982dd08f43 (patch) | |
| tree | af5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/sql/test_generative.py | |
| parent | 698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff) | |
| download | sqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz | |
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run
the tests. [ticket:970]
Diffstat (limited to 'test/sql/test_generative.py')
| -rw-r--r-- | test/sql/test_generative.py | 818 |
1 files changed, 818 insertions, 0 deletions
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py new file mode 100644 index 000000000..ca427ca5f --- /dev/null +++ b/test/sql/test_generative.py @@ -0,0 +1,818 @@ +from sqlalchemy import * +from sqlalchemy.sql import table, column, ClauseElement +from sqlalchemy.sql.expression import _clone, _from_objects +from sqlalchemy.test import * +from sqlalchemy.sql.visitors import * +from sqlalchemy import util +from sqlalchemy.sql import util as sql_util + + +class TraversalTest(TestBase, AssertsExecutionResults): + """test ClauseVisitor's traversal, particularly its ability to copy and modify + a ClauseElement in place.""" + + @classmethod + def setup_class(cls): + global A, B + + # establish two ficticious ClauseElements. + # define deep equality semantics as well as deep identity semantics. + class A(ClauseElement): + __visit_name__ = 'a' + + def __init__(self, expr): + self.expr = expr + + def is_other(self, other): + return other is self + + __hash__ = ClauseElement.__hash__ + + def __eq__(self, other): + return other.expr == self.expr + + def __ne__(self, other): + return other.expr != self.expr + + def __str__(self): + return "A(%s)" % repr(self.expr) + + class B(ClauseElement): + __visit_name__ = 'b' + + def __init__(self, *items): + self.items = items + + def is_other(self, other): + if other is not self: + return False + for i1, i2 in zip(self.items, other.items): + if i1 is not i2: + return False + return True + + __hash__ = ClauseElement.__hash__ + + def __eq__(self, other): + for i1, i2 in zip(self.items, other.items): + if i1 != i2: + return False + return True + + def __ne__(self, other): + for i1, i2 in zip(self.items, other.items): + if i1 != i2: + return True + return False + + def _copy_internals(self, clone=_clone): + self.items = [clone(i) for i in self.items] + + def get_children(self, **kwargs): + return self.items + + def __str__(self): + return "B(%s)" % repr([str(i) for i in self.items]) + + def test_test_classes(self): + a1 = A("expr1") + struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) + struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) + struct3 = B(a1, A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3")) + + assert a1.is_other(a1) + assert struct.is_other(struct) + assert struct == struct2 + assert struct != struct3 + assert not struct.is_other(struct2) + assert not struct.is_other(struct3) + + def test_clone(self): + struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) + + class Vis(CloningVisitor): + def visit_a(self, a): + pass + def visit_b(self, b): + pass + + vis = Vis() + s2 = vis.traverse(struct) + assert struct == s2 + assert not struct.is_other(s2) + + def test_no_clone(self): + struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) + + class Vis(ClauseVisitor): + def visit_a(self, a): + pass + def visit_b(self, b): + pass + + vis = Vis() + s2 = vis.traverse(struct) + assert struct == s2 + assert struct.is_other(s2) + + def test_change_in_place(self): + struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) + struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), A("expr2b")), A("expr3")) + struct3 = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3")) + + class Vis(CloningVisitor): + def visit_a(self, a): + if a.expr == "expr2": + a.expr = "expr2modified" + def visit_b(self, b): + pass + + vis = Vis() + s2 = vis.traverse(struct) + assert struct != s2 + assert not struct.is_other(s2) + assert struct2 == s2 + + class Vis2(CloningVisitor): + def visit_a(self, a): + if a.expr == "expr2b": + a.expr = "expr2bmodified" + def visit_b(self, b): + pass + + vis2 = Vis2() + s3 = vis2.traverse(struct) + assert struct != s3 + assert struct3 == s3 + + def test_visit_name(self): + # override fns in testlib/schema.py + from sqlalchemy import Column + + class CustomObj(Column): + pass + + assert CustomObj.__visit_name__ == Column.__visit_name__ == 'column' + + foo, bar = CustomObj('foo', String), CustomObj('bar', String) + bin = foo == bar + s = set(ClauseVisitor().iterate(bin)) + assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin]) + +class ClauseTest(TestBase, AssertsCompiledSQL): + """test copy-in-place behavior of various ClauseElements.""" + + @classmethod + def setup_class(cls): + global t1, t2 + t1 = table("table1", + column("col1"), + column("col2"), + column("col3"), + ) + t2 = table("table2", + column("col1"), + column("col2"), + column("col3"), + ) + + def test_binary(self): + clause = t1.c.col2 == t2.c.col2 + assert str(clause) == CloningVisitor().traverse(clause) + + def test_binary_anon_label_quirk(self): + t = table('t1', column('col1')) + + + f = t.c.col1 * 5 + self.assert_compile(select([f]), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1") + + f.anon_label + + a = t.alias() + f = sql_util.ClauseAdapter(a).traverse(f) + + self.assert_compile(select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") + + def test_join(self): + clause = t1.join(t2, t1.c.col2==t2.c.col2) + c1 = str(clause) + assert str(clause) == str(CloningVisitor().traverse(clause)) + + class Vis(CloningVisitor): + def visit_binary(self, binary): + binary.right = t2.c.col3 + + clause2 = Vis().traverse(clause) + assert c1 == str(clause) + assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3)) + + def test_text(self): + clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')]) + c1 = str(clause) + class Vis(CloningVisitor): + def visit_textclause(self, text): + text.text = text.text + " SOME MODIFIER=:lala" + text.bindparams['lala'] = bindparam('lala') + + clause2 = Vis().traverse(clause) + assert c1 == str(clause) + assert str(clause2) == c1 + " SOME MODIFIER=:lala" + assert clause.bindparams.keys() == ['bar'] + assert set(clause2.bindparams.keys()) == set(['bar', 'lala']) + + def test_select(self): + s2 = select([t1]) + s2_assert = str(s2) + s3_assert = str(select([t1], t1.c.col2==7)) + class Vis(CloningVisitor): + def visit_select(self, select): + select.append_whereclause(t1.c.col2==7) + s3 = Vis().traverse(s2) + assert str(s3) == s3_assert + assert str(s2) == s2_assert + print str(s2) + print str(s3) + class Vis(ClauseVisitor): + def visit_select(self, select): + select.append_whereclause(t1.c.col2==7) + Vis().traverse(s2) + assert str(s2) == s3_assert + + print "------------------" + + s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9))) + class Vis(CloningVisitor): + def visit_select(self, select): + select.append_whereclause(t1.c.col3==9) + s4 = Vis().traverse(s3) + print str(s3) + print str(s4) + assert str(s4) == s4_assert + assert str(s3) == s3_assert + + print "------------------" + s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9))) + class Vis(CloningVisitor): + def visit_binary(self, binary): + if binary.left is t1.c.col3: + binary.left = t1.c.col1 + binary.right = bindparam("col1", unique=True) + s5 = Vis().traverse(s4) + print str(s4) + print str(s5) + assert str(s5) == s5_assert + assert str(s4) == s4_assert + + def test_union(self): + u = union(t1.select(), t2.select()) + u2 = CloningVisitor().traverse(u) + assert str(u) == str(u2) + assert [str(c) for c in u2.c] == [str(c) for c in u.c] + + u = union(t1.select(), t2.select()) + cols = [str(c) for c in u.c] + u2 = CloningVisitor().traverse(u) + assert str(u) == str(u2) + assert [str(c) for c in u2.c] == cols + + s1 = select([t1], t1.c.col1 == bindparam('id_param')) + s2 = select([t2]) + u = union(s1, s2) + + u2 = u.params(id_param=7) + u3 = u.params(id_param=10) + assert str(u) == str(u2) == str(u3) + assert u2.compile().params == {'id_param':7} + assert u3.compile().params == {'id_param':10} + + def test_adapt_union(self): + u = union(t1.select().where(t1.c.col1==4), t1.select().where(t1.c.col1==5)).alias() + + assert sql_util.ClauseAdapter(u).traverse(t1) is u + + def test_binds(self): + """test that unique bindparams change their name upon clone() to prevent conflicts""" + + s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias() + s2 = CloningVisitor().traverse(s).alias() + s3 = select([s], s.c.col2==s2.c.col2) + + self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\ + "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) AS anon_1, "\ + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "\ + "WHERE anon_1.col2 = anon_2.col2") + + s = select([t1], t1.c.col1==4).alias() + s2 = CloningVisitor().traverse(s).alias() + s3 = select([s], s.c.col2==s2.c.col2) + self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\ + "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1, "\ + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "\ + "WHERE anon_1.col2 = anon_2.col2") + + @testing.emits_warning('.*replaced by another column with the same key') + def test_alias(self): + subq = t2.select().alias('subq') + s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)]) + orig = str(s) + s2 = CloningVisitor().traverse(s) + assert orig == str(s) == str(s2) + + s4 = CloningVisitor().traverse(s2) + assert orig == str(s) == str(s2) == str(s4) + + s3 = sql_util.ClauseAdapter(table('foo')).traverse(s) + assert orig == str(s) == str(s3) + + s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3) + assert orig == str(s) == str(s3) == str(s4) + + def test_correlated_select(self): + s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2) + class Vis(CloningVisitor): + def visit_select(self, select): + select.append_whereclause(t1.c.col2==7) + + self.assert_compile(Vis().traverse(s), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1") + + def test_this_thing(self): + s = select([t1]).where(t1.c.col1=='foo').alias() + s2 = select([s.c.col1]) + + self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1") + t1a = t1.alias() + s2 = sql_util.ClauseAdapter(t1a).traverse(s2) + self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1_1.col1 AS col1, table1_1.col2 AS col2, table1_1.col3 AS col3 FROM table1 AS table1_1 WHERE table1_1.col1 = :col1_1) AS anon_1") + + def test_select_fromtwice(self): + t1a = t1.alias() + + s = select([1], t1.c.col1==t1a.c.col1, from_obj=t1a).correlate(t1) + self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1") + + s = CloningVisitor().traverse(s) + self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1") + + s = select([t1]).where(t1.c.col1=='foo').alias() + + s2 = select([1], t1.c.col1==s.c.col1, from_obj=s).correlate(t1) + self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1") + s2 = ReplacingCloningVisitor().traverse(s2) + self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1") + +class ClauseAdapterTest(TestBase, AssertsCompiledSQL): + @classmethod + def setup_class(cls): + global t1, t2 + t1 = table("table1", + column("col1"), + column("col2"), + column("col3"), + ) + t2 = table("table2", + column("col1"), + column("col2"), + column("col3"), + ) + + def test_correlation_on_clone(self): + t1alias = t1.alias('t1alias') + t2alias = t2.alias('t2alias') + vis = sql_util.ClauseAdapter(t1alias) + + s = select(['*'], from_obj=[t1alias, t2alias]).as_scalar() + assert t2alias in s._froms + assert t1alias in s._froms + + self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") + s = vis.traverse(s) + + assert t2alias not in s._froms # not present because it's been cloned + + assert t1alias in s._froms # present because the adapter placed it there + + # correlate list on "s" needs to take into account the full _cloned_set for each element in _froms when correlating + self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") + + s = select(['*'], from_obj=[t1alias, t2alias]).correlate(t2alias).as_scalar() + self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") + s = vis.traverse(s) + self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") + s = CloningVisitor().traverse(s) + self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") + + s = select(['*']).where(t1.c.col1==t2.c.col1).as_scalar() + self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1") + vis = sql_util.ClauseAdapter(t1alias) + s = vis.traverse(s) + self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias") + s = CloningVisitor().traverse(s) + self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias") + + s = select(['*']).where(t1.c.col1==t2.c.col1).correlate(t1).as_scalar() + self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1") + vis = sql_util.ClauseAdapter(t1alias) + s = vis.traverse(s) + self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias") + s = CloningVisitor().traverse(s) + self.assert_compile(select([t1alias.c.col1, s]), "SELECT t1alias.col1, (SELECT * FROM table2 WHERE t1alias.col1 = table2.col1) AS anon_1 FROM table1 AS t1alias") + + @testing.fails_on_everything_except() + def test_joins_dont_adapt(self): + # adapting to a join, i.e. ClauseAdapter(t1.join(t2)), doesn't make much sense. + # ClauseAdapter doesn't make any changes if it's against a straight join. + users = table('users', column('id')) + addresses = table('addresses', column('id'), column('user_id')) + + ualias = users.alias() + + s = select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users) #.as_scalar().label(None) + s= sql_util.ClauseAdapter(ualias).traverse(s) + + j1 = addresses.join(ualias, addresses.c.user_id==ualias.c.id) + + self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), "SELECT count(addresses.id) AS count_1 FROM addresses WHERE users_1.id = addresses.user_id") + + def test_table_to_alias(self): + + t1alias = t1.alias('t1alias') + + vis = sql_util.ClauseAdapter(t1alias) + ff = vis.traverse(func.count(t1.c.col1).label('foo')) + assert list(_from_objects(ff)) == [t1alias] + + self.assert_compile(vis.traverse(select(['*'], from_obj=[t1])), "SELECT * FROM table1 AS t1alias") + self.assert_compile(select(['*'], t1.c.col1==t2.c.col2), "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2)), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2])), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1)), "SELECT * FROM table2 WHERE t1alias.col1 = table2.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2)), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = table2.col2") + + self.assert_compile(vis.traverse(case([(t1.c.col1==5, t1.c.col2)], else_=t1.c.col1)), + "CASE WHEN (t1alias.col1 = :col1_1) THEN t1alias.col2 ELSE t1alias.col1 END" + ) + self.assert_compile(vis.traverse(case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1)), + "CASE t1alias.col1 WHEN :param_1 THEN t1alias.col2 ELSE t1alias.col1 END" + ) + + + s = select(['*'], from_obj=[t1]).alias('foo') + self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo") + self.assert_compile(vis.traverse(s.select()), "SELECT foo.* FROM (SELECT * FROM table1 AS t1alias) AS foo") + self.assert_compile(s.select(), "SELECT foo.* FROM (SELECT * FROM table1) AS foo") + + ff = vis.traverse(func.count(t1.c.col1).label('foo')) + self.assert_compile(select([ff]), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias") + assert list(_from_objects(ff)) == [t1alias] + +# TODO: + # self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias") + + t2alias = t2.alias('t2alias') + vis.chain(sql_util.ClauseAdapter(t2alias)) + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2)), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2])), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1)), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") + self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2)), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2") + + def test_include_exclude(self): + m = MetaData() + a=Table( 'a',m, + Column( 'id', Integer, primary_key=True), + Column( 'xxx_id', Integer, ForeignKey( 'a.id', name='adf',use_alter=True ) ) + ) + + e = (a.c.id == a.c.xxx_id) + assert str(e) == "a.id = a.xxx_id" + b = a.alias() + + e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]), + equivalents= { a.c.id: set([ a.c.id]) } + ).traverse( e) + + assert str(e) == "a_1.id = a.xxx_id" + + def test_recursive_equivalents(self): + m = MetaData() + a = Table('a', m, Column('x', Integer), Column('y', Integer)) + b = Table('b', m, Column('x', Integer), Column('y', Integer)) + c = Table('c', m, Column('x', Integer), Column('y', Integer)) + + # force a recursion overflow, by linking a.c.x<->c.c.x, and + # asking for a nonexistent col. corresponding_column should prevent + # endless depth. + adapt = sql_util.ClauseAdapter( b, equivalents= {a.c.x: set([ c.c.x]), c.c.x:set([a.c.x])}) + assert adapt._corresponding_column(a.c.x, False) is None + + def test_multilevel_equivalents(self): + m = MetaData() + a = Table('a', m, Column('x', Integer), Column('y', Integer)) + b = Table('b', m, Column('x', Integer), Column('y', Integer)) + c = Table('c', m, Column('x', Integer), Column('y', Integer)) + + alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias() + + # two levels of indirection from c.x->b.x->a.x, requires recursive + # corresponding_column call + adapt = sql_util.ClauseAdapter(alias, equivalents= {b.c.x: set([ a.c.x]), c.c.x:set([b.c.x])}) + assert adapt._corresponding_column(a.c.x, False) is alias.c.x + assert adapt._corresponding_column(c.c.x, False) is alias.c.x + + def test_join_to_alias(self): + metadata = MetaData() + a = Table('a', metadata, + Column('id', Integer, primary_key=True)) + b = Table('b', metadata, + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) + c = Table('c', metadata, + Column('id', Integer, primary_key=True), + Column('bid', Integer, ForeignKey('b.id')), + ) + + d = Table('d', metadata, + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) + + j1 = a.outerjoin(b) + j2 = select([j1], use_labels=True) + + j3 = c.join(j2, j2.c.b_id==c.c.bid) + + j4 = j3.outerjoin(d) + self.assert_compile(j4, "c JOIN (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid FROM a LEFT OUTER JOIN b ON a.id = b.aid) " + "ON b_id = c.bid" + " LEFT OUTER JOIN d ON a_id = d.aid") + j5 = j3.alias('foo') + j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0] + + # this statement takes c join(a join b), wraps it inside an aliased "select * from c join(a join b) AS foo". + # the outermost right side "left outer join d" stays the same, except "d" joins against foo.a_id instead + # of plain "a_id" + self.assert_compile(j6, "(SELECT c.id AS c_id, c.bid AS c_bid, a_id AS a_id, b_id AS b_id, b_aid AS b_aid FROM " + "c JOIN (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid FROM a LEFT OUTER JOIN b ON a.id = b.aid) " + "ON b_id = c.bid) AS foo" + " LEFT OUTER JOIN d ON foo.a_id = d.aid") + + def test_derived_from(self): + assert select([t1]).is_derived_from(t1) + assert not select([t2]).is_derived_from(t1) + assert not t1.is_derived_from(select([t1])) + assert t1.alias().is_derived_from(t1) + + + s1 = select([t1, t2]).alias('foo') + s2 = select([s1]).limit(5).offset(10).alias() + assert s2.is_derived_from(s1) + s2 = s2._clone() + assert s2.is_derived_from(s1) + + def test_aliasedselect_to_aliasedselect(self): + # original issue from ticket #904 + s1 = select([t1]).alias('foo') + s2 = select([s1]).limit(5).offset(10).alias() + + self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1), + "SELECT foo.col1, foo.col2, foo.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10") + + j = s1.outerjoin(t2, s1.c.col1==t2.c.col1) + self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), + "SELECT anon_1.col1, anon_1.col2, anon_1.col3, table2.col1, table2.col2, table2.col3 FROM "\ + "(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\ + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\ + "LEFT OUTER JOIN table2 ON anon_1.col1 = table2.col1") + + talias = t1.alias('bar') + j = s1.outerjoin(talias, s1.c.col1==talias.c.col1) + self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), + "SELECT anon_1.col1, anon_1.col2, anon_1.col3, bar.col1, bar.col2, bar.col3 FROM "\ + "(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\ + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\ + "LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1") + + def test_functions(self): + self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), "count(table1_1.col1)") + + s = select([func.count(t1.c.col1)]) + self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), "SELECT count(table1_1.col1) AS count_1 FROM table1 AS table1_1") + + def test_recursive(self): + metadata = MetaData() + a = Table('a', metadata, + Column('id', Integer, primary_key=True)) + b = Table('b', metadata, + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) + c = Table('c', metadata, + Column('id', Integer, primary_key=True), + Column('bid', Integer, ForeignKey('b.id')), + ) + + d = Table('d', metadata, + Column('id', Integer, primary_key=True), + Column('aid', Integer, ForeignKey('a.id')), + ) + + u = union( + a.join(b).select().apply_labels(), + a.join(d).select().apply_labels() + ).alias() + + self.assert_compile( + sql_util.ClauseAdapter(u).traverse(select([c.c.bid]).where(c.c.bid==u.c.b_aid)), + "SELECT c.bid "\ + "FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid "\ + "FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id AS d_id, d.aid AS d_aid "\ + "FROM a JOIN d ON a.id = d.aid) AS anon_1 "\ + "WHERE c.bid = anon_1.b_aid" + ) + +class SpliceJoinsTest(TestBase, AssertsCompiledSQL): + @classmethod + def setup_class(cls): + global table1, table2, table3, table4 + def _table(name): + return table(name, column("col1"), column("col2"),column("col3")) + + table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")] + + def test_splice(self): + (t1, t2, t3, t4) = (table1, table2, table1.alias(), table2.alias()) + + j = t1.join(t2, t1.c.col1==t2.c.col1).join(t3, t2.c.col1==t3.c.col1).join(t4, t4.c.col1==t1.c.col1) + + s = select([t1]).where(t1.c.col2<5).alias() + + self.assert_compile(sql_util.splice_joins(s, j), + "(SELECT table1.col1 AS col1, table1.col2 AS col2, "\ + "table1.col3 AS col3 FROM table1 WHERE table1.col2 < :col2_1) AS anon_1 "\ + "JOIN table2 ON anon_1.col1 = table2.col1 JOIN table1 AS table1_1 ON table2.col1 = table1_1.col1 "\ + "JOIN table2 AS table2_1 ON table2_1.col1 = anon_1.col1") + + def test_stop_on(self): + (t1, t2, t3) = (table1, table2, table3) + + j1= t1.join(t2, t1.c.col1==t2.c.col1) + j2 = j1.join(t3, t2.c.col1==t3.c.col1) + + s = select([t1]).select_from(j1).alias() + + self.assert_compile(sql_util.splice_joins(s, j2), + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 JOIN table2 "\ + "ON table1.col1 = table2.col1) AS anon_1 JOIN table2 ON anon_1.col1 = table2.col1 JOIN table3 "\ + "ON table2.col1 = table3.col1" + ) + + self.assert_compile(sql_util.splice_joins(s, j2, j1), + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 "\ + "JOIN table2 ON table1.col1 = table2.col1) AS anon_1 JOIN table3 ON table2.col1 = table3.col1") + + def test_splice_2(self): + t2a = table2.alias() + t3a = table3.alias() + j1 = table1.join(t2a, table1.c.col1==t2a.c.col1).join(t3a, t2a.c.col2==t3a.c.col2) + + t2b = table4.alias() + j2 = table1.join(t2b, table1.c.col3==t2b.c.col3) + + self.assert_compile(sql_util.splice_joins(table1, j1), + "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\ + "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2") + + self.assert_compile(sql_util.splice_joins(table1, j2), "table1 JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3") + + self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2), + "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\ + "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2 "\ + "JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3") + + +class SelectTest(TestBase, AssertsCompiledSQL): + """tests the generative capability of Select""" + + @classmethod + def setup_class(cls): + global t1, t2 + t1 = table("table1", + column("col1"), + column("col2"), + column("col3"), + ) + t2 = table("table2", + column("col1"), + column("col2"), + column("col3"), + ) + + def test_select(self): + self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), + "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :col1_1 ORDER BY table1.col3") + + self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), + "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\ + "FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3") + + s = select([t2], t2.c.col1==t1.c.col1, correlate=False) + s = s.correlate(t1).order_by(t2.c.col3) + self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), + "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\ + "FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3") + + def test_columns(self): + s = t1.select() + self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + select_copy = s.column('yyy') + self.assert_compile(select_copy, "SELECT table1.col1, table1.col2, table1.col3, yyy FROM table1") + assert s.columns is not select_copy.columns + assert s._columns is not select_copy._columns + assert s._raw_columns is not select_copy._raw_columns + self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + + def test_froms(self): + s = t1.select() + self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + select_copy = s.select_from(t2) + self.assert_compile(select_copy, "SELECT table1.col1, table1.col2, table1.col3 FROM table1, table2") + assert s._froms is not select_copy._froms + self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + + def test_correlation(self): + s = select([t2], t1.c.col1==t2.c.col1) + self.assert_compile(s, "SELECT table2.col1, table2.col2, table2.col3 FROM table2, table1 WHERE table1.col1 = table2.col1") + s2 = select([t1], t1.c.col2==s.c.col2) + self.assert_compile(s2, "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " + "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 " + "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") + + s3 = s.correlate(None) + self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " + "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 " + "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") + self.assert_compile(select([t1], t1.c.col2==s.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " + "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 " + "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") + s4 = s3.correlate(t1) + self.assert_compile(select([t1], t1.c.col2==s4.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " + "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 " + "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") + self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " + "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 " + "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") + + def test_prefixes(self): + s = t1.select() + self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + select_copy = s.prefix_with("FOOBER") + self.assert_compile(select_copy, "SELECT FOOBER table1.col1, table1.col2, table1.col3 FROM table1") + self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + + +class InsertTest(TestBase, AssertsCompiledSQL): + """Tests the generative capability of Insert""" + + # fixme: consolidate converage from elsewhere here and expand + + @classmethod + def setup_class(cls): + global t1, t2 + t1 = table("table1", + column("col1"), + column("col2"), + column("col3"), + ) + t2 = table("table2", + column("col1"), + column("col2"), + column("col3"), + ) + + def test_prefixes(self): + i = t1.insert() + self.assert_compile(i, + "INSERT INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)") + + gen = i.prefix_with("foober") + self.assert_compile(gen, + "INSERT foober INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)") + + self.assert_compile(i, + "INSERT INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)") + + i2 = t1.insert(prefixes=['squiznart']) + self.assert_compile(i2, + "INSERT squiznart INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)") + + gen2 = i2.prefix_with("quux") + self.assert_compile(gen2, + "INSERT squiznart quux INTO " + "table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)") + |
