summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_functions.py15
-rw-r--r--test/sql/test_resultset.py179
-rw-r--r--test/sql/test_text.py14
3 files changed, 198 insertions, 10 deletions
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index d0e68f1e3..a46d1af54 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -99,6 +99,21 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
select([func.foo()], use_labels=True), "SELECT foo() AS foo_1"
)
+ def test_use_labels_function_element(self):
+ from sqlalchemy.ext.compiler import compiles
+
+ class max_(FunctionElement):
+ name = "max"
+
+ @compiles(max_)
+ def visit_max(element, compiler, **kw):
+ return "max(%s)" % compiler.process(element.clauses, **kw)
+
+ self.assert_compile(
+ select([max_(5, 6)], use_labels=True),
+ "SELECT max(:max_2, :max_3) AS max_1",
+ )
+
def test_underscores(self):
self.assert_compile(func.if_(), "if()")
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 35353671c..2563c7d0c 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -26,7 +26,10 @@ from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
from sqlalchemy.engine import result as _result
from sqlalchemy.engine import Row
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.sql import expression
from sqlalchemy.sql.selectable import TextualSelect
+from sqlalchemy.sql.sqltypes import NULLTYPE
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import assertions
@@ -35,6 +38,7 @@ from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import le_
from sqlalchemy.testing import ne_
@@ -721,6 +725,13 @@ class ResultProxyTest(fixtures.TablesTest):
lambda: r["user_id"],
)
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ result._getter,
+ "user_id",
+ )
+
# pure positional targeting; users.c.user_id
# and addresses.c.user_id are known!
# works as of 1.1 issue #3501
@@ -856,7 +867,6 @@ class ResultProxyTest(fixtures.TablesTest):
addresses = self.tables.addresses
with testing.db.connect() as conn:
- # MARKMARK
conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
conn.execute(
addresses.insert(),
@@ -1344,19 +1354,107 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(row.keyed1_a, "a1")
eq_(row.keyed1_c, "c1")
+ def _test_keyed_targeting_no_label_at_all(self, expression):
+ lt = literal_column("2")
+ stmt = select([literal_column("1"), expression, lt]).select_from(
+ self.tables.keyed1
+ )
+ row = testing.db.execute(stmt).first()
+
+ eq_(row[expression], "a1")
+ eq_(row[lt], 2)
+
+ # Postgresql for example has the key as "?column?", which dupes
+ # easily. we get around that because we know that "2" is unique
+ eq_(row["2"], 2)
+
+ def test_keyed_targeting_no_label_at_all_one(self):
+ class not_named_max(expression.ColumnElement):
+ name = "not_named_max"
+
+ @compiles(not_named_max)
+ def visit_max(element, compiler, **kw):
+ # explicit add
+ kw["add_to_result_map"](None, None, (element,), NULLTYPE)
+ return "max(a)"
+
+ # assert that there is no "AS max_" or any label of any kind.
+ eq_(str(select([not_named_max()])), "SELECT max(a)")
+
+ nnm = not_named_max()
+ self._test_keyed_targeting_no_label_at_all(nnm)
+
+ def test_keyed_targeting_no_label_at_all_two(self):
+ class not_named_max(expression.ColumnElement):
+ name = "not_named_max"
+
+ @compiles(not_named_max)
+ def visit_max(element, compiler, **kw):
+ # we don't add to keymap here; compiler should be doing it
+ return "max(a)"
+
+ # assert that there is no "AS max_" or any label of any kind.
+ eq_(str(select([not_named_max()])), "SELECT max(a)")
+
+ nnm = not_named_max()
+ self._test_keyed_targeting_no_label_at_all(nnm)
+
+ def test_keyed_targeting_no_label_at_all_text(self):
+ t1 = text("max(a)")
+ t2 = text("min(a)")
+
+ stmt = select([t1, t2]).select_from(self.tables.keyed1)
+ row = testing.db.execute(stmt).first()
+
+ eq_(row[t1], "a1")
+ eq_(row[t2], "a1")
+
@testing.requires.duplicate_names_in_cursor_description
def test_keyed_accessor_composite_conflict_2(self):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
row = testing.db.execute(select([keyed1, keyed2])).first()
- # row.b is unambiguous
- eq_(row.b, "b2")
+
+ # column access is unambiguous
+ eq_(row[self.tables.keyed2.c.b], "b2")
+
# row.a is ambiguous
assert_raises_message(
exc.InvalidRequestError, "Ambig", getattr, row, "a"
)
+ # for "b" we have kind of a choice. the name "b" is not ambiguous in
+ # cursor.description in this case. It is however ambiguous as far as
+ # the objects we have queried against, because keyed1.c.a has key="b"
+ # and keyed1.c.b is "b". historically this was allowed as
+ # non-ambiguous, however the column it targets changes based on
+ # whether or not the dupe is present so it's ambiguous
+ # eq_(row.b, "b2")
+ assert_raises_message(
+ exc.InvalidRequestError, "Ambig", getattr, row, "b"
+ )
+
+ # illustrate why row.b above is ambiguous, and not "b2"; because
+ # if we didn't have keyed2, now it matches row.a. a new column
+ # shouldn't be able to grab the value from a previous column.
+ row = testing.db.execute(select([keyed1])).first()
+ eq_(row.b, "a1")
+
+ def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self):
+ keyed1 = self.tables.keyed1
+ keyed2 = self.tables.keyed2
+
+ row = testing.db.execute(
+ select([keyed1, keyed2]).apply_labels()
+ ).first()
+
+ # column access is unambiguous
+ eq_(row[self.tables.keyed2.c.b], "b2")
+
+ eq_(row["keyed2_b"], "b2")
+ eq_(row["keyed1_a"], "a1")
+
def test_keyed_accessor_composite_names_precedent(self):
keyed1 = self.tables.keyed1
keyed4 = self.tables.keyed4
@@ -1374,13 +1472,13 @@ class KeyTargetingTest(fixtures.TablesTest):
row = testing.db.execute(select([keyed1, keyed3])).first()
eq_(row.q, "c1")
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name 'a'",
- getattr,
- row,
- "b",
- )
+
+ # prior to 1.4 #4887, this raised an "ambiguous column name 'a'""
+ # message, because "b" is linked to "a" which is a dupe. but we know
+ # where "b" is in the row by position.
+ eq_(row.b, "a1")
+
+ # "a" is of course ambiguous
assert_raises_message(
exc.InvalidRequestError,
"Ambiguous column name 'a'",
@@ -1406,6 +1504,67 @@ class KeyTargetingTest(fixtures.TablesTest):
assert_raises(KeyError, lambda: row["keyed2_c"])
assert_raises(KeyError, lambda: row["keyed2_q"])
+ def test_keyed_accessor_column_is_repeated_multiple_times(self):
+ # test new logic added as a result of the combination of #4892 and
+ # #4887. We allow duplicate columns, but we also have special logic
+ # to disambiguate for the same column repeated, and as #4887 adds
+ # stricter ambiguous result column logic, the compiler has to know to
+ # not add these dupe columns to the result map, else they register as
+ # ambiguous.
+
+ keyed2 = self.tables.keyed2
+ keyed3 = self.tables.keyed3
+
+ stmt = select(
+ [
+ keyed2.c.a,
+ keyed3.c.a,
+ keyed2.c.a,
+ keyed2.c.a,
+ keyed3.c.a,
+ keyed3.c.a,
+ keyed3.c.d,
+ keyed3.c.d,
+ ]
+ ).apply_labels()
+
+ result = testing.db.execute(stmt)
+ is_false(result._metadata.matched_on_name)
+
+ # ensure the result map is the same number of cols so we can
+ # use positional targeting
+ eq_(
+ [rec[0] for rec in result.context.compiled._result_columns],
+ [
+ "keyed2_a",
+ "keyed3_a",
+ "keyed2_a__1",
+ "keyed2_a__1",
+ "keyed3_a__1",
+ "keyed3_a__1",
+ "keyed3_d",
+ "keyed3_d__1",
+ ],
+ )
+ row = result.first()
+
+ # keyed access will ignore the dupe cols
+ eq_(row[keyed2.c.a], "a2")
+ eq_(row[keyed3.c.a], "a3")
+ eq_(result._getter(keyed3.c.a)(row), "a3")
+ eq_(row[keyed3.c.d], "d3")
+
+ # however we can get everything positionally
+ eq_(row, ("a2", "a3", "a2", "a2", "a3", "a3", "d3", "d3"))
+ eq_(row[0], "a2")
+ eq_(row[1], "a3")
+ eq_(row[2], "a2")
+ eq_(row[3], "a2")
+ eq_(row[4], "a3")
+ eq_(row[5], "a3")
+ eq_(row[6], "d3")
+ eq_(row[7], "d3")
+
def test_columnclause_schema_column_one(self):
# originally addressed by [ticket:2932], however liberalized
# Column-targeting rules are deprecated
diff --git a/test/sql/test_text.py b/test/sql/test_text.py
index 6af2cffcf..3f12d06a9 100644
--- a/test/sql/test_text.py
+++ b/test/sql/test_text.py
@@ -20,6 +20,7 @@ from sqlalchemy import union
from sqlalchemy import util
from sqlalchemy.sql import column
from sqlalchemy.sql import quoted_name
+from sqlalchemy.sql import sqltypes
from sqlalchemy.sql import table
from sqlalchemy.sql import util as sql_util
from sqlalchemy.testing import assert_raises_message
@@ -49,6 +50,19 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"select * from foo where lala = bar",
)
+ def test_text_adds_to_result_map(self):
+ t1, t2 = text("t1"), text("t2")
+
+ stmt = select([t1, t2])
+ compiled = stmt.compile()
+ eq_(
+ compiled._result_columns,
+ [
+ (None, None, (t1,), sqltypes.NULLTYPE),
+ (None, None, (t2,), sqltypes.NULLTYPE),
+ ],
+ )
+
class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL):