diff options
Diffstat (limited to 'test/sql/test_external_traversal.py')
| -rw-r--r-- | test/sql/test_external_traversal.py | 115 |
1 files changed, 101 insertions, 14 deletions
diff --git a/test/sql/test_external_traversal.py b/test/sql/test_external_traversal.py index b7e58dad9..21b5b2d27 100644 --- a/test/sql/test_external_traversal.py +++ b/test/sql/test_external_traversal.py @@ -1372,13 +1372,17 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): column_adapter = sql_util.ColumnAdapter(stmt2) is_(column_adapter.columns[expr], stmt2.selected_columns[3]) - def test_correlate_except_on_clone(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_correlate_except_on_clone(self, use_adapt_from): # test [ticket:4537]'s issue t1alias = t1.alias("t1alias") j = t1.join(t1alias, t1.c.col1 == t1alias.c.col2) - vis = sql_util.ClauseAdapter(j) + if use_adapt_from: + vis = sql_util.ClauseAdapter(j, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(j) # "control" subquery - uses correlate which has worked w/ adaption # for a long time @@ -1456,6 +1460,65 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "JOIN table2 ON table1.col1 = table2.col1", ) + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_correlate_except_with_mixed_tables(self, use_adapt_from): + # test [ticket:6060]'s issue + + stmt = select( + t1.c.col1, + select(func.count(t2.c.col1)) + .where(t2.c.col1 == t1.c.col1) + .correlate_except(t2) + .scalar_subquery(), + ) + self.assert_compile( + stmt, + "SELECT table1.col1, " + "(SELECT count(table2.col1) AS count_1 FROM table2 " + "WHERE table2.col1 = table1.col1) AS anon_1 " + "FROM table1", + ) + + subq = ( + select(t1) + .join(t2, t1.c.col1 == t2.c.col1) + .where(t2.c.col2 == "x") + .subquery() + ) + + if use_adapt_from: + vis = sql_util.ClauseAdapter(subq, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(subq) + + if use_adapt_from: + self.assert_compile( + vis.traverse(stmt), + "SELECT anon_1.col1, " + "(SELECT count(table2.col1) AS count_1 FROM table2 WHERE " + "table2.col1 = anon_1.col1) AS anon_2 " + "FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 JOIN table2 ON table1.col1 = " + "table2.col1 WHERE table2.col2 = :col2_1) AS anon_1", + ) + else: + # here's the buggy version. table2 gets yanked out of the + # correlated subquery also. AliasedClass now uses + # adapt_from_selectables in all cases + self.assert_compile( + vis.traverse(stmt), + "SELECT anon_1.col1, " + "(SELECT count(table2.col1) AS count_1 FROM table2, " + "(SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 JOIN table2 ON " + "table1.col1 = table2.col1 WHERE table2.col2 = :col2_1) AS " + "anon_1 WHERE table2.col1 = anon_1.col1) AS anon_2 " + "FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 JOIN table2 " + "ON table1.col1 = table2.col1 " + "WHERE table2.col2 = :col2_1) AS anon_1", + ) + @testing.fails_on_everything_except() def test_joins_dont_adapt(self): # adapting to a join, i.e. ClauseAdapter(t1.join(t2)), doesn't @@ -1483,24 +1546,36 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "addresses.user_id", ) - def test_table_to_alias_1(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_table_to_alias_1(self, use_adapt_from): t1alias = t1.alias("t1alias") - vis = sql_util.ClauseAdapter(t1alias) + if use_adapt_from: + vis = sql_util.ClauseAdapter(t1alias, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(t1alias) ff = vis.traverse(func.count(t1.c.col1).label("foo")) assert list(_from_objects(ff)) == [t1alias] - def test_table_to_alias_2(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_table_to_alias_2(self, use_adapt_from): t1alias = t1.alias("t1alias") - vis = sql_util.ClauseAdapter(t1alias) + if use_adapt_from: + vis = sql_util.ClauseAdapter(t1alias, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( vis.traverse(select(literal_column("*")).select_from(t1)), "SELECT * FROM table1 AS t1alias", ) - def test_table_to_alias_3(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_table_to_alias_3(self, use_adapt_from): t1alias = t1.alias("t1alias") - vis = sql_util.ClauseAdapter(t1alias) + if use_adapt_from: + vis = sql_util.ClauseAdapter(t1alias, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( vis.traverse( select(literal_column("*")).where(t1.c.col1 == t2.c.col2) @@ -1509,9 +1584,13 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE t1alias.col1 = table2.col2", ) - def test_table_to_alias_4(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_table_to_alias_4(self, use_adapt_from): t1alias = t1.alias("t1alias") - vis = sql_util.ClauseAdapter(t1alias) + if use_adapt_from: + vis = sql_util.ClauseAdapter(t1alias, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( vis.traverse( select(literal_column("*")) @@ -1522,9 +1601,13 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE t1alias.col1 = table2.col2", ) - def test_table_to_alias_5(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_table_to_alias_5(self, use_adapt_from): t1alias = t1.alias("t1alias") - vis = sql_util.ClauseAdapter(t1alias) + if use_adapt_from: + vis = sql_util.ClauseAdapter(t1alias, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( select(t1alias, t2).where( t1alias.c.col1 @@ -1543,9 +1626,13 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)", ) - def test_table_to_alias_6(self): + @testing.combinations((True,), (False,), argnames="use_adapt_from") + def test_table_to_alias_6(self, use_adapt_from): t1alias = t1.alias("t1alias") - vis = sql_util.ClauseAdapter(t1alias) + if use_adapt_from: + vis = sql_util.ClauseAdapter(t1alias, adapt_from_selectables=[t1]) + else: + vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( select(t1alias, t2).where( t1alias.c.col1 |
