diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-10-01 17:38:41 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-10-04 15:58:29 -0400 |
| commit | 485216dea6d7a5814d200b4f14b8a363ed0f8caa (patch) | |
| tree | 18885a1c53b59f36a75f6507b33747b8852605e7 /test/sql | |
| parent | 60e64a2c35e7e5a0125c5fefbf0caf531eeb2eda (diff) | |
| download | sqlalchemy-485216dea6d7a5814d200b4f14b8a363ed0f8caa.tar.gz | |
Deprecate textual column matching in Row
Deprecate query.instances() without a context
Deprecate string alias with contains_eager()
Deprecated the behavior by which a :class:`.Column` can be used as the key
in a result set row lookup, when that :class:`.Column` is not part of the
SQL selectable that is being selected; that is, it is only matched on name.
A deprecation warning is now emitted for this case. Various ORM use
cases, such as those involving :func:`.text` constructs, have been improved
so that this fallback logic is avoided in most cases.
Calling the :meth:`.Query.instances` method without passing a
:class:`.QueryContext` is deprecated. The original use case for this was
that a :class:`.Query` could yield ORM objects when given only the entities
to be selected as well as a DBAPI cursor object. However, for this to work
correctly there is essential metadata that is passed from a SQLAlchemy
:class:`.ResultProxy` that is derived from the mapped column expressions,
which comes originally from the :class:`.QueryContext`. To retrieve ORM
results from arbitrary SELECT statements, the :meth:`.Query.from_statement`
method should be used.
Note there is a small bump in test_zoomark because the
column._label is being calculated for each of those columns within
baseline_3_properties, as it is now part of the result map.
This label can't be calculated when the column is attached
to the table because it needs to have all the columns present
to do this correctly. Another approach here would be to
pre-load the _label before the test runs however the zoomark
tests don't have an easy place for this to happen and it's
not really worth it.
Fixes: #4877
Fixes: #4719
Change-Id: I9bd29e72e6dce7c855651d69ba68d7383469acbc
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_compiler.py | 39 | ||||
| -rw-r--r-- | test/sql/test_deprecations.py | 505 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 179 | ||||
| -rw-r--r-- | test/sql/test_text.py | 10 |
4 files changed, 614 insertions, 119 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index cda22a5ac..36e9cd33b 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -4466,8 +4466,8 @@ class ResultMapTest(fixtures.TestBase): eq_( comp._create_result_map(), { - "a": ("a", (t.c.a, "a", "a"), t.c.a.type), - "b": ("b", (t.c.b, "b", "b"), t.c.b.type), + "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type), + "b": ("b", (t.c.b, "b", "b", "t_b"), t.c.b.type), }, ) @@ -4478,7 +4478,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp._create_result_map(), - {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)}, ) def test_compound_only_top_populates(self): @@ -4487,7 +4487,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp._create_result_map(), - {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)}, ) def test_label_plus_element(self): @@ -4500,7 +4500,7 @@ class ResultMapTest(fixtures.TestBase): eq_( comp._create_result_map(), { - "a": ("a", (t.c.a, "a", "a"), t.c.a.type), + "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type), "bar": ("bar", (l1, "bar"), l1.type), "anon_1": ( tc.anon_label, @@ -4541,7 +4541,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {"a": ("a", (aint, "a", "a"), aint.type)}, + {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)}, ) def test_insert_from_select(self): @@ -4557,7 +4557,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {"a": ("a", (aint, "a", "a"), aint.type)}, + {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)}, ) def test_nested_api(self): @@ -4596,12 +4596,22 @@ class ResultMapTest(fixtures.TestBase): { "otherid": ( "otherid", - (table2.c.otherid, "otherid", "otherid"), + ( + table2.c.otherid, + "otherid", + "otherid", + "myothertable_otherid", + ), table2.c.otherid.type, ), "othername": ( "othername", - (table2.c.othername, "othername", "othername"), + ( + table2.c.othername, + "othername", + "othername", + "myothertable_othername", + ), table2.c.othername.type, ), "k1": ("k1", (1, 2, 3), int_), @@ -4612,18 +4622,23 @@ class ResultMapTest(fixtures.TestBase): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ), "k2": ("k2", (3, 4, 5), int_), "name": ( "name", - (table1.c.name, "name", "name"), + (table1.c.name, "name", "name", "mytable_name"), table1.c.name.type, ), "description": ( "description", - (table1.c.description, "description", "description"), + ( + table1.c.description, + "description", + "description", + "mytable_description", + ), table1.c.description.type, ), }, diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py index a75de3f11..bac0a7413 100644 --- a/test/sql/test_deprecations.py +++ b/test/sql/test_deprecations.py @@ -2,12 +2,13 @@ from sqlalchemy import alias from sqlalchemy import bindparam -from sqlalchemy import Column +from sqlalchemy import CHAR from sqlalchemy import column from sqlalchemy import create_engine from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func +from sqlalchemy import INT from sqlalchemy import Integer from sqlalchemy import join from sqlalchemy import literal_column @@ -16,11 +17,11 @@ from sqlalchemy import null from sqlalchemy import select from sqlalchemy import sql from sqlalchemy import String -from sqlalchemy import Table from sqlalchemy import table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import util +from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.schema import DDL from sqlalchemy.sql import coercions @@ -35,8 +36,12 @@ from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures +from sqlalchemy.testing import in_ from sqlalchemy.testing import is_true from sqlalchemy.testing import mock +from sqlalchemy.testing import not_in_ +from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import Table class DeprecationWarningsTest(fixtures.TestBase): @@ -730,7 +735,7 @@ class TextTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, @@ -993,7 +998,7 @@ class TextualSelectTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, @@ -1124,3 +1129,495 @@ class DeprecatedAppendMethTest(fixtures.TestBase, AssertsCompiledSQL): with self._expect_deprecated("Select", "from", "select_from"): stmt.append_from(t1.join(t2, t1.c.q == t2.c.q)) self.assert_compile(stmt, "SELECT t1.q FROM t1 JOIN t2 ON t1.q = t2.q") + + +class KeyTargetingTest(fixtures.TablesTest): + run_inserts = "once" + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "keyed1", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + ) + Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table("content", metadata, Column("t", String(30), key="type")) + Table("bar", metadata, Column("ctype", String(30), key="content_type")) + + if testing.requires.schemas.enabled: + Table( + "wschema", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + schema=testing.config.test_schema, + ) + + @classmethod + def insert_data(cls): + cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) + cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) + cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) + cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) + cls.tables.content.insert().execute(type="t1") + + if testing.requires.schemas.enabled: + cls.tables[ + "%s.wschema" % testing.config.test_schema + ].insert().execute(dict(b="a1", q="c1")) + + def test_column_label_overlap_fallback(self): + content, bar = self.tables.content, self.tables.bar + row = testing.db.execute( + select([content.c.type.label("content_type")]) + ).first() + + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + def test_columnclause_schema_column_one(self): + keyed2 = self.tables.keyed2 + + # this is addressed by [ticket:2932] + # ColumnClause._compare_name_for_result allows the + # columns which the statement is against to be lightweight + # cols, which results in a more liberal comparison scheme + a, b = sql.column("a"), sql.column("b") + stmt = select([a, b]).select_from(table("keyed2")) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + + def test_columnclause_schema_column_two(self): + keyed2 = self.tables.keyed2 + + a, b = sql.column("a"), sql.column("b") + stmt = select([keyed2.c.a, keyed2.c.b]) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(b, row) + + def test_columnclause_schema_column_three(self): + keyed2 = self.tables.keyed2 + + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated + + a, b = sql.column("a"), sql.column("b") + stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.b, row) + + def test_columnclause_schema_column_four(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + a, b = sql.column("keyed2_a"), sql.column("keyed2_b") + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + a, b + ) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_b, row) + + def test_columnclause_schema_column_five(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + keyed2_a=CHAR, keyed2_b=CHAR + ) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_b, row) + + +class ResultProxyTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + Table( + "addresses", + metadata, + Column( + "address_id", + Integer, + primary_key=True, + test_needs_autoincrement=True, + ), + Column("user_id", Integer, ForeignKey("users.user_id")), + Column("address", String(30)), + test_needs_acid=True, + ) + + Table( + "users2", + metadata, + Column("user_id", INT, primary_key=True), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + @classmethod + def insert_data(cls): + users = cls.tables.users + + with testing.db.connect() as conn: + conn.execute( + users.insert(), + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), + ) + + def test_column_accessor_textual_select(self): + users = self.tables.users + + # this will create column() objects inside + # the select(), these need to match on name anyway + r = testing.db.execute( + select([column("user_id"), column("user_name")]) + .select_from(table("users")) + .where(text("user_id=2")) + ).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_id], 2) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_name], "jack") + + def test_column_accessor_basic_text(self): + users = self.tables.users + + r = testing.db.execute( + text("select * from users where user_id=2") + ).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_id], 2) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_name], "jack") + + @testing.provide_metadata + def test_column_label_overlap_fallback(self): + content = Table("content", self.metadata, Column("type", String(30))) + bar = Table("bar", self.metadata, Column("content_type", String(30))) + self.metadata.create_all(testing.db) + testing.db.execute(content.insert().values(type="t1")) + + row = testing.db.execute(content.select(use_labels=True)).first() + in_(content.c.type, row) + not_in_(bar.c.content_type, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([content.c.type.label("content_type")]) + ).first() + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() + + not_in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + def test_pickled_rows(self): + users = self.tables.users + addresses = self.tables.addresses + with testing.db.connect() as conn: + conn.execute(users.delete()) + conn.execute( + users.insert(), + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, + ) + + for pickle in False, True: + for use_labels in False, True: + result = ( + users.select(use_labels=use_labels) + .order_by(users.c.user_id) + .execute() + .fetchall() + ) + + if pickle: + result = util.pickle.loads(util.pickle.dumps(result)) + + if pickle: + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][users.c.user_id], 7) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][users.c.user_name], "jack") + + if not pickle or use_labels: + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id], + ) + else: + # test with a different table. name resolution is + # causing 'user_id' to match when use_labels wasn't used. + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][addresses.c.user_id], 7) + + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.address_id], + ) + + +class PositionalTextTest(fixtures.TablesTest): + run_inserts = "once" + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "text1", + metadata, + Column("a", CHAR(2)), + Column("b", CHAR(2)), + Column("c", CHAR(2)), + Column("d", CHAR(2)), + ) + + @classmethod + def insert_data(cls): + cls.tables.text1.insert().execute( + [dict(a="a1", b="b1", c="c1", d="d1")] + ) + + def test_anon_aliased_overlapping(self): + text1 = self.tables.text1 + + c1 = text1.c.a.label(None) + c2 = text1.alias().c.a + c3 = text1.alias().c.a.label(None) + c4 = text1.c.a.label(None) + + stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) + result = testing.db.execute(stmt) + row = result.first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.a], "a1") + + def test_anon_aliased_unique(self): + text1 = self.tables.text1 + + c1 = text1.c.a.label(None) + c2 = text1.alias().c.c + c3 = text1.alias().c.b + c4 = text1.alias().c.d.label(None) + + stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) + result = testing.db.execute(stmt) + row = result.first() + + eq_(row[c1], "a1") + eq_(row[c2], "b1") + eq_(row[c3], "c1") + eq_(row[c4], "d1") + + # key fallback rules still match this to a column + # unambiguously based on its name + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.a], "a1") + + # key fallback rules still match this to a column + # unambiguously based on its name + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.d], "d1") + + # text1.c.b goes nowhere....because we hit key fallback + # but the text1.c.b doesn't derive from text1.c.c + assert_raises_message( + exc.NoSuchColumnError, + "Could not locate column in row for column 'text1.b'", + lambda: row[text1.c.b], + ) diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 36d442ed7..35353671c 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -26,6 +26,7 @@ from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row +from sqlalchemy.sql.selectable import TextualSelect from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assertions @@ -192,29 +193,16 @@ class ResultProxyTest(fixtures.TablesTest): row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - - row = testing.db.execute( - select([content.c.type.label("content_type")]) - ).first() - in_(content.c.type, row) - - not_in_(bar.c.content_type, row) - - in_(sql.column("content_type"), row) row = testing.db.execute( select([func.now().label("content_type")]) ).first() - not_in_(content.c.type, row) + not_in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - def test_pickled_rows(self): users = self.tables.users - addresses = self.tables.addresses users.insert().execute( {"user_id": 7, "user_name": "jack"}, @@ -246,26 +234,10 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(result[0].keys()), ["user_id", "user_name"]) eq_(result[0][0], 7) - eq_(result[0][users.c.user_id], 7) - eq_(result[0][users.c.user_name], "jack") - - if not pickle or use_labels: - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id], - ) - else: - # test with a different table. name resolution is - # causing 'user_id' to match when use_labels wasn't used. - eq_(result[0][addresses.c.user_id], 7) assert_raises( exc.NoSuchColumnError, lambda: result[0]["fake key"] ) - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id], - ) def test_column_error_printing(self): result = testing.db.execute(select([1])) @@ -350,11 +322,9 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_id, 2) eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) eq_(r.user_name, "jack") eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") def test_column_accessor_textual_select(self): users = self.tables.users @@ -373,11 +343,9 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_id, 2) eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) eq_(r.user_name, "jack") eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") def test_column_accessor_dotted_union(self): users = self.tables.users @@ -849,6 +817,81 @@ class ResultProxyTest(fixtures.TablesTest): set([True]), ) + def test_loose_matching_one(self): + users = self.tables.users + addresses = self.tables.addresses + + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + conn.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = conn.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.address_id AS address_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [ + users.c.user_id, + users.c.user_name, + addresses.c.address_id, + ], + positional=False, + ) + ) + row = result.first() + eq_(row[users.c.user_id], 1) + eq_(row[users.c.user_name], "john") + + def test_loose_matching_two(self): + users = self.tables.users + addresses = self.tables.addresses + + with testing.db.connect() as conn: + # MARKMARK + conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + conn.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = conn.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.user_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [users.c.user_id, users.c.user_name, addresses.c.user_id], + positional=False, + ) + ) + row = result.first() + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[users.c.user_id], + ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[addresses.c.user_id], + ) + eq_(row[users.c.user_name], "john") + def test_ambiguous_column_by_col_plus_label(self): users = self.tables.users @@ -1363,79 +1406,37 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row["keyed2_c"]) assert_raises(KeyError, lambda: row["keyed2_q"]) - def test_column_label_overlap_fallback(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute( - select([content.c.type.label("content_type")]) - ).first() - - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) - - in_(sql.column("content_type"), row) - - row = testing.db.execute( - select([func.now().label("content_type")]) - ).first() - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - - def test_column_label_overlap_fallback_2(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(content.select(use_labels=True)).first() - in_(content.c.type, row) - not_in_(bar.c.content_type, row) - not_in_(sql.column("content_type"), row) - def test_columnclause_schema_column_one(self): - keyed2 = self.tables.keyed2 - - # this is addressed by [ticket:2932] - # ColumnClause._compare_name_for_result allows the - # columns which the statement is against to be lightweight - # cols, which results in a more liberal comparison scheme + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated a, b = sql.column("a"), sql.column("b") stmt = select([a, b]).select_from(table("keyed2")) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(a, row) in_(b, row) def test_columnclause_schema_column_two(self): keyed2 = self.tables.keyed2 - a, b = sql.column("a"), sql.column("b") stmt = select([keyed2.c.a, keyed2.c.b]) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) - in_(a, row) - in_(b, row) def test_columnclause_schema_column_three(self): - keyed2 = self.tables.keyed2 - # this is also addressed by [ticket:2932] - a, b = sql.column("a"), sql.column("b") stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) - in_(a, row) - in_(b, row) in_(stmt.selected_columns.a, row) in_(stmt.selected_columns.b, row) def test_columnclause_schema_column_four(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated a, b = sql.column("keyed2_a"), sql.column("keyed2_b") stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( @@ -1443,16 +1444,12 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(a, row) in_(b, row) in_(stmt.selected_columns.keyed2_a, row) in_(stmt.selected_columns.keyed2_b, row) def test_columnclause_schema_column_five(self): - keyed2 = self.tables.keyed2 - # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( @@ -1460,8 +1457,6 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(stmt.selected_columns.keyed2_a, row) in_(stmt.selected_columns.keyed2_b, row) @@ -1577,14 +1572,6 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row[c3], "c1") eq_(row[c4], "d1") - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.a], "a1") - - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.d], "d1") - # text1.c.b goes nowhere....because we hit key fallback # but the text1.c.b doesn't derive from text1.c.c assert_raises_message( @@ -1610,10 +1597,6 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row[c3], "c1") eq_(row[c4], "d1") - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.a], "a1") - def test_anon_aliased_name_conflict(self): text1 = self.tables.text1 diff --git a/test/sql/test_text.py b/test/sql/test_text.py index bec56f2f7..6af2cffcf 100644 --- a/test/sql/test_text.py +++ b/test/sql/test_text.py @@ -427,12 +427,12 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "id": ( "id", - (t.selected_columns.id, "id", "id"), + (t.selected_columns.id, "id", "id", "id"), t.selected_columns.id.type, ), "name": ( "name", - (t.selected_columns.name, "name", "name"), + (t.selected_columns.name, "name", "name", "name"), t.selected_columns.name.type, ), }, @@ -447,12 +447,12 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "id": ( "id", - (t.selected_columns.id, "id", "id"), + (t.selected_columns.id, "id", "id", "id"), t.selected_columns.id.type, ), "name": ( "name", - (t.selected_columns.name, "name", "name"), + (t.selected_columns.name, "name", "name", "name"), t.selected_columns.name.type, ), }, @@ -474,7 +474,7 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, |
