diff options
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_functions.py | 15 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 179 | ||||
| -rw-r--r-- | test/sql/test_text.py | 14 |
3 files changed, 198 insertions, 10 deletions
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index d0e68f1e3..a46d1af54 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -99,6 +99,21 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): select([func.foo()], use_labels=True), "SELECT foo() AS foo_1" ) + def test_use_labels_function_element(self): + from sqlalchemy.ext.compiler import compiles + + class max_(FunctionElement): + name = "max" + + @compiles(max_) + def visit_max(element, compiler, **kw): + return "max(%s)" % compiler.process(element.clauses, **kw) + + self.assert_compile( + select([max_(5, 6)], use_labels=True), + "SELECT max(:max_2, :max_3) AS max_1", + ) + def test_underscores(self): self.assert_compile(func.if_(), "if()") diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 35353671c..2563c7d0c 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -26,7 +26,10 @@ from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql import expression from sqlalchemy.sql.selectable import TextualSelect +from sqlalchemy.sql.sqltypes import NULLTYPE from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assertions @@ -35,6 +38,7 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ from sqlalchemy.testing import is_ +from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import le_ from sqlalchemy.testing import ne_ @@ -721,6 +725,13 @@ class ResultProxyTest(fixtures.TablesTest): lambda: r["user_id"], ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + result._getter, + "user_id", + ) + # pure positional targeting; users.c.user_id # and addresses.c.user_id are known! # works as of 1.1 issue #3501 @@ -856,7 +867,6 @@ class ResultProxyTest(fixtures.TablesTest): addresses = self.tables.addresses with testing.db.connect() as conn: - # MARKMARK conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) conn.execute( addresses.insert(), @@ -1344,19 +1354,107 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.keyed1_a, "a1") eq_(row.keyed1_c, "c1") + def _test_keyed_targeting_no_label_at_all(self, expression): + lt = literal_column("2") + stmt = select([literal_column("1"), expression, lt]).select_from( + self.tables.keyed1 + ) + row = testing.db.execute(stmt).first() + + eq_(row[expression], "a1") + eq_(row[lt], 2) + + # Postgresql for example has the key as "?column?", which dupes + # easily. we get around that because we know that "2" is unique + eq_(row["2"], 2) + + def test_keyed_targeting_no_label_at_all_one(self): + class not_named_max(expression.ColumnElement): + name = "not_named_max" + + @compiles(not_named_max) + def visit_max(element, compiler, **kw): + # explicit add + kw["add_to_result_map"](None, None, (element,), NULLTYPE) + return "max(a)" + + # assert that there is no "AS max_" or any label of any kind. + eq_(str(select([not_named_max()])), "SELECT max(a)") + + nnm = not_named_max() + self._test_keyed_targeting_no_label_at_all(nnm) + + def test_keyed_targeting_no_label_at_all_two(self): + class not_named_max(expression.ColumnElement): + name = "not_named_max" + + @compiles(not_named_max) + def visit_max(element, compiler, **kw): + # we don't add to keymap here; compiler should be doing it + return "max(a)" + + # assert that there is no "AS max_" or any label of any kind. + eq_(str(select([not_named_max()])), "SELECT max(a)") + + nnm = not_named_max() + self._test_keyed_targeting_no_label_at_all(nnm) + + def test_keyed_targeting_no_label_at_all_text(self): + t1 = text("max(a)") + t2 = text("min(a)") + + stmt = select([t1, t2]).select_from(self.tables.keyed1) + row = testing.db.execute(stmt).first() + + eq_(row[t1], "a1") + eq_(row[t2], "a1") + @testing.requires.duplicate_names_in_cursor_description def test_keyed_accessor_composite_conflict_2(self): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 row = testing.db.execute(select([keyed1, keyed2])).first() - # row.b is unambiguous - eq_(row.b, "b2") + + # column access is unambiguous + eq_(row[self.tables.keyed2.c.b], "b2") + # row.a is ambiguous assert_raises_message( exc.InvalidRequestError, "Ambig", getattr, row, "a" ) + # for "b" we have kind of a choice. the name "b" is not ambiguous in + # cursor.description in this case. It is however ambiguous as far as + # the objects we have queried against, because keyed1.c.a has key="b" + # and keyed1.c.b is "b". historically this was allowed as + # non-ambiguous, however the column it targets changes based on + # whether or not the dupe is present so it's ambiguous + # eq_(row.b, "b2") + assert_raises_message( + exc.InvalidRequestError, "Ambig", getattr, row, "b" + ) + + # illustrate why row.b above is ambiguous, and not "b2"; because + # if we didn't have keyed2, now it matches row.a. a new column + # shouldn't be able to grab the value from a previous column. + row = testing.db.execute(select([keyed1])).first() + eq_(row.b, "a1") + + def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self): + keyed1 = self.tables.keyed1 + keyed2 = self.tables.keyed2 + + row = testing.db.execute( + select([keyed1, keyed2]).apply_labels() + ).first() + + # column access is unambiguous + eq_(row[self.tables.keyed2.c.b], "b2") + + eq_(row["keyed2_b"], "b2") + eq_(row["keyed1_a"], "a1") + def test_keyed_accessor_composite_names_precedent(self): keyed1 = self.tables.keyed1 keyed4 = self.tables.keyed4 @@ -1374,13 +1472,13 @@ class KeyTargetingTest(fixtures.TablesTest): row = testing.db.execute(select([keyed1, keyed3])).first() eq_(row.q, "c1") - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name 'a'", - getattr, - row, - "b", - ) + + # prior to 1.4 #4887, this raised an "ambiguous column name 'a'"" + # message, because "b" is linked to "a" which is a dupe. but we know + # where "b" is in the row by position. + eq_(row.b, "a1") + + # "a" is of course ambiguous assert_raises_message( exc.InvalidRequestError, "Ambiguous column name 'a'", @@ -1406,6 +1504,67 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row["keyed2_c"]) assert_raises(KeyError, lambda: row["keyed2_q"]) + def test_keyed_accessor_column_is_repeated_multiple_times(self): + # test new logic added as a result of the combination of #4892 and + # #4887. We allow duplicate columns, but we also have special logic + # to disambiguate for the same column repeated, and as #4887 adds + # stricter ambiguous result column logic, the compiler has to know to + # not add these dupe columns to the result map, else they register as + # ambiguous. + + keyed2 = self.tables.keyed2 + keyed3 = self.tables.keyed3 + + stmt = select( + [ + keyed2.c.a, + keyed3.c.a, + keyed2.c.a, + keyed2.c.a, + keyed3.c.a, + keyed3.c.a, + keyed3.c.d, + keyed3.c.d, + ] + ).apply_labels() + + result = testing.db.execute(stmt) + is_false(result._metadata.matched_on_name) + + # ensure the result map is the same number of cols so we can + # use positional targeting + eq_( + [rec[0] for rec in result.context.compiled._result_columns], + [ + "keyed2_a", + "keyed3_a", + "keyed2_a__1", + "keyed2_a__1", + "keyed3_a__1", + "keyed3_a__1", + "keyed3_d", + "keyed3_d__1", + ], + ) + row = result.first() + + # keyed access will ignore the dupe cols + eq_(row[keyed2.c.a], "a2") + eq_(row[keyed3.c.a], "a3") + eq_(result._getter(keyed3.c.a)(row), "a3") + eq_(row[keyed3.c.d], "d3") + + # however we can get everything positionally + eq_(row, ("a2", "a3", "a2", "a2", "a3", "a3", "d3", "d3")) + eq_(row[0], "a2") + eq_(row[1], "a3") + eq_(row[2], "a2") + eq_(row[3], "a2") + eq_(row[4], "a3") + eq_(row[5], "a3") + eq_(row[6], "d3") + eq_(row[7], "d3") + def test_columnclause_schema_column_one(self): # originally addressed by [ticket:2932], however liberalized # Column-targeting rules are deprecated diff --git a/test/sql/test_text.py b/test/sql/test_text.py index 6af2cffcf..3f12d06a9 100644 --- a/test/sql/test_text.py +++ b/test/sql/test_text.py @@ -20,6 +20,7 @@ from sqlalchemy import union from sqlalchemy import util from sqlalchemy.sql import column from sqlalchemy.sql import quoted_name +from sqlalchemy.sql import sqltypes from sqlalchemy.sql import table from sqlalchemy.sql import util as sql_util from sqlalchemy.testing import assert_raises_message @@ -49,6 +50,19 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "select * from foo where lala = bar", ) + def test_text_adds_to_result_map(self): + t1, t2 = text("t1"), text("t2") + + stmt = select([t1, t2]) + compiled = stmt.compile() + eq_( + compiled._result_columns, + [ + (None, None, (t1,), sqltypes.NULLTYPE), + (None, None, (t2,), sqltypes.NULLTYPE), + ], + ) + class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL): |
