diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-10-03 17:36:27 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-10-07 23:06:06 -0400 |
| commit | 65aee6cce57fd1cca3a95814feff3ed99a5a51ee (patch) | |
| tree | 0352d74938902a9242dfb97ca5215d9191a2ad16 /test/sql | |
| parent | ebd9788c986c56b8b845fa83609a6eb2c0cef083 (diff) | |
| download | sqlalchemy-65aee6cce57fd1cca3a95814feff3ed99a5a51ee.tar.gz | |
Add result map targeting for custom compiled, text objects
In order for text(), custom compiled objects, etc. to be usable
by Query(), they are all targeted by object key in the result map.
As we no longer want Query to implicitly label these, as well as that
text() has no label feature, support adding entries to the result
map that have no name, key, or type, only the object itself, and
then ensure that the compiler sets up for positional targeting
when this condition is detected.
Allows for more flexible ORM query usage with custom expressions
and text() while having less special logic in query itself.
Fixes: #4887
Change-Id: Ie073da127d292d43cb132a2b31bc90af88bfe2fd
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_functions.py | 15 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 179 | ||||
| -rw-r--r-- | test/sql/test_text.py | 14 |
3 files changed, 198 insertions, 10 deletions
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index d0e68f1e3..a46d1af54 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -99,6 +99,21 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): select([func.foo()], use_labels=True), "SELECT foo() AS foo_1" ) + def test_use_labels_function_element(self): + from sqlalchemy.ext.compiler import compiles + + class max_(FunctionElement): + name = "max" + + @compiles(max_) + def visit_max(element, compiler, **kw): + return "max(%s)" % compiler.process(element.clauses, **kw) + + self.assert_compile( + select([max_(5, 6)], use_labels=True), + "SELECT max(:max_2, :max_3) AS max_1", + ) + def test_underscores(self): self.assert_compile(func.if_(), "if()") diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 35353671c..2563c7d0c 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -26,7 +26,10 @@ from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql import expression from sqlalchemy.sql.selectable import TextualSelect +from sqlalchemy.sql.sqltypes import NULLTYPE from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assertions @@ -35,6 +38,7 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ from sqlalchemy.testing import is_ +from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import le_ from sqlalchemy.testing import ne_ @@ -721,6 +725,13 @@ class ResultProxyTest(fixtures.TablesTest): lambda: r["user_id"], ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + result._getter, + "user_id", + ) + # pure positional targeting; users.c.user_id # and addresses.c.user_id are known! # works as of 1.1 issue #3501 @@ -856,7 +867,6 @@ class ResultProxyTest(fixtures.TablesTest): addresses = self.tables.addresses with testing.db.connect() as conn: - # MARKMARK conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) conn.execute( addresses.insert(), @@ -1344,19 +1354,107 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.keyed1_a, "a1") eq_(row.keyed1_c, "c1") + def _test_keyed_targeting_no_label_at_all(self, expression): + lt = literal_column("2") + stmt = select([literal_column("1"), expression, lt]).select_from( + self.tables.keyed1 + ) + row = testing.db.execute(stmt).first() + + eq_(row[expression], "a1") + eq_(row[lt], 2) + + # Postgresql for example has the key as "?column?", which dupes + # easily. we get around that because we know that "2" is unique + eq_(row["2"], 2) + + def test_keyed_targeting_no_label_at_all_one(self): + class not_named_max(expression.ColumnElement): + name = "not_named_max" + + @compiles(not_named_max) + def visit_max(element, compiler, **kw): + # explicit add + kw["add_to_result_map"](None, None, (element,), NULLTYPE) + return "max(a)" + + # assert that there is no "AS max_" or any label of any kind. + eq_(str(select([not_named_max()])), "SELECT max(a)") + + nnm = not_named_max() + self._test_keyed_targeting_no_label_at_all(nnm) + + def test_keyed_targeting_no_label_at_all_two(self): + class not_named_max(expression.ColumnElement): + name = "not_named_max" + + @compiles(not_named_max) + def visit_max(element, compiler, **kw): + # we don't add to keymap here; compiler should be doing it + return "max(a)" + + # assert that there is no "AS max_" or any label of any kind. + eq_(str(select([not_named_max()])), "SELECT max(a)") + + nnm = not_named_max() + self._test_keyed_targeting_no_label_at_all(nnm) + + def test_keyed_targeting_no_label_at_all_text(self): + t1 = text("max(a)") + t2 = text("min(a)") + + stmt = select([t1, t2]).select_from(self.tables.keyed1) + row = testing.db.execute(stmt).first() + + eq_(row[t1], "a1") + eq_(row[t2], "a1") + @testing.requires.duplicate_names_in_cursor_description def test_keyed_accessor_composite_conflict_2(self): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 row = testing.db.execute(select([keyed1, keyed2])).first() - # row.b is unambiguous - eq_(row.b, "b2") + + # column access is unambiguous + eq_(row[self.tables.keyed2.c.b], "b2") + # row.a is ambiguous assert_raises_message( exc.InvalidRequestError, "Ambig", getattr, row, "a" ) + # for "b" we have kind of a choice. the name "b" is not ambiguous in + # cursor.description in this case. It is however ambiguous as far as + # the objects we have queried against, because keyed1.c.a has key="b" + # and keyed1.c.b is "b". historically this was allowed as + # non-ambiguous, however the column it targets changes based on + # whether or not the dupe is present so it's ambiguous + # eq_(row.b, "b2") + assert_raises_message( + exc.InvalidRequestError, "Ambig", getattr, row, "b" + ) + + # illustrate why row.b above is ambiguous, and not "b2"; because + # if we didn't have keyed2, now it matches row.a. a new column + # shouldn't be able to grab the value from a previous column. + row = testing.db.execute(select([keyed1])).first() + eq_(row.b, "a1") + + def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self): + keyed1 = self.tables.keyed1 + keyed2 = self.tables.keyed2 + + row = testing.db.execute( + select([keyed1, keyed2]).apply_labels() + ).first() + + # column access is unambiguous + eq_(row[self.tables.keyed2.c.b], "b2") + + eq_(row["keyed2_b"], "b2") + eq_(row["keyed1_a"], "a1") + def test_keyed_accessor_composite_names_precedent(self): keyed1 = self.tables.keyed1 keyed4 = self.tables.keyed4 @@ -1374,13 +1472,13 @@ class KeyTargetingTest(fixtures.TablesTest): row = testing.db.execute(select([keyed1, keyed3])).first() eq_(row.q, "c1") - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name 'a'", - getattr, - row, - "b", - ) + + # prior to 1.4 #4887, this raised an "ambiguous column name 'a'"" + # message, because "b" is linked to "a" which is a dupe. but we know + # where "b" is in the row by position. + eq_(row.b, "a1") + + # "a" is of course ambiguous assert_raises_message( exc.InvalidRequestError, "Ambiguous column name 'a'", @@ -1406,6 +1504,67 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row["keyed2_c"]) assert_raises(KeyError, lambda: row["keyed2_q"]) + def test_keyed_accessor_column_is_repeated_multiple_times(self): + # test new logic added as a result of the combination of #4892 and + # #4887. We allow duplicate columns, but we also have special logic + # to disambiguate for the same column repeated, and as #4887 adds + # stricter ambiguous result column logic, the compiler has to know to + # not add these dupe columns to the result map, else they register as + # ambiguous. + + keyed2 = self.tables.keyed2 + keyed3 = self.tables.keyed3 + + stmt = select( + [ + keyed2.c.a, + keyed3.c.a, + keyed2.c.a, + keyed2.c.a, + keyed3.c.a, + keyed3.c.a, + keyed3.c.d, + keyed3.c.d, + ] + ).apply_labels() + + result = testing.db.execute(stmt) + is_false(result._metadata.matched_on_name) + + # ensure the result map is the same number of cols so we can + # use positional targeting + eq_( + [rec[0] for rec in result.context.compiled._result_columns], + [ + "keyed2_a", + "keyed3_a", + "keyed2_a__1", + "keyed2_a__1", + "keyed3_a__1", + "keyed3_a__1", + "keyed3_d", + "keyed3_d__1", + ], + ) + row = result.first() + + # keyed access will ignore the dupe cols + eq_(row[keyed2.c.a], "a2") + eq_(row[keyed3.c.a], "a3") + eq_(result._getter(keyed3.c.a)(row), "a3") + eq_(row[keyed3.c.d], "d3") + + # however we can get everything positionally + eq_(row, ("a2", "a3", "a2", "a2", "a3", "a3", "d3", "d3")) + eq_(row[0], "a2") + eq_(row[1], "a3") + eq_(row[2], "a2") + eq_(row[3], "a2") + eq_(row[4], "a3") + eq_(row[5], "a3") + eq_(row[6], "d3") + eq_(row[7], "d3") + def test_columnclause_schema_column_one(self): # originally addressed by [ticket:2932], however liberalized # Column-targeting rules are deprecated diff --git a/test/sql/test_text.py b/test/sql/test_text.py index 6af2cffcf..3f12d06a9 100644 --- a/test/sql/test_text.py +++ b/test/sql/test_text.py @@ -20,6 +20,7 @@ from sqlalchemy import union from sqlalchemy import util from sqlalchemy.sql import column from sqlalchemy.sql import quoted_name +from sqlalchemy.sql import sqltypes from sqlalchemy.sql import table from sqlalchemy.sql import util as sql_util from sqlalchemy.testing import assert_raises_message @@ -49,6 +50,19 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "select * from foo where lala = bar", ) + def test_text_adds_to_result_map(self): + t1, t2 = text("t1"), text("t2") + + stmt = select([t1, t2]) + compiled = stmt.compile() + eq_( + compiled._result_columns, + [ + (None, None, (t1,), sqltypes.NULLTYPE), + (None, None, (t2,), sqltypes.NULLTYPE), + ], + ) + class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL): |
