diff options
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_compiler.py | 39 | ||||
| -rw-r--r-- | test/sql/test_deprecations.py | 505 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 179 | ||||
| -rw-r--r-- | test/sql/test_text.py | 10 |
4 files changed, 614 insertions, 119 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index cda22a5ac..36e9cd33b 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -4466,8 +4466,8 @@ class ResultMapTest(fixtures.TestBase): eq_( comp._create_result_map(), { - "a": ("a", (t.c.a, "a", "a"), t.c.a.type), - "b": ("b", (t.c.b, "b", "b"), t.c.b.type), + "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type), + "b": ("b", (t.c.b, "b", "b", "t_b"), t.c.b.type), }, ) @@ -4478,7 +4478,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp._create_result_map(), - {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)}, ) def test_compound_only_top_populates(self): @@ -4487,7 +4487,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp._create_result_map(), - {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)}, ) def test_label_plus_element(self): @@ -4500,7 +4500,7 @@ class ResultMapTest(fixtures.TestBase): eq_( comp._create_result_map(), { - "a": ("a", (t.c.a, "a", "a"), t.c.a.type), + "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type), "bar": ("bar", (l1, "bar"), l1.type), "anon_1": ( tc.anon_label, @@ -4541,7 +4541,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {"a": ("a", (aint, "a", "a"), aint.type)}, + {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)}, ) def test_insert_from_select(self): @@ -4557,7 +4557,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {"a": ("a", (aint, "a", "a"), aint.type)}, + {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)}, ) def test_nested_api(self): @@ -4596,12 +4596,22 @@ class ResultMapTest(fixtures.TestBase): { "otherid": ( "otherid", - (table2.c.otherid, "otherid", "otherid"), + ( + table2.c.otherid, + "otherid", + "otherid", + "myothertable_otherid", + ), table2.c.otherid.type, ), "othername": ( "othername", - (table2.c.othername, "othername", "othername"), + ( + table2.c.othername, + "othername", + "othername", + "myothertable_othername", + ), table2.c.othername.type, ), "k1": ("k1", (1, 2, 3), int_), @@ -4612,18 +4622,23 @@ class ResultMapTest(fixtures.TestBase): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ), "k2": ("k2", (3, 4, 5), int_), "name": ( "name", - (table1.c.name, "name", "name"), + (table1.c.name, "name", "name", "mytable_name"), table1.c.name.type, ), "description": ( "description", - (table1.c.description, "description", "description"), + ( + table1.c.description, + "description", + "description", + "mytable_description", + ), table1.c.description.type, ), }, diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py index a75de3f11..bac0a7413 100644 --- a/test/sql/test_deprecations.py +++ b/test/sql/test_deprecations.py @@ -2,12 +2,13 @@ from sqlalchemy import alias from sqlalchemy import bindparam -from sqlalchemy import Column +from sqlalchemy import CHAR from sqlalchemy import column from sqlalchemy import create_engine from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func +from sqlalchemy import INT from sqlalchemy import Integer from sqlalchemy import join from sqlalchemy import literal_column @@ -16,11 +17,11 @@ from sqlalchemy import null from sqlalchemy import select from sqlalchemy import sql from sqlalchemy import String -from sqlalchemy import Table from sqlalchemy import table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import util +from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.schema import DDL from sqlalchemy.sql import coercions @@ -35,8 +36,12 @@ from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures +from sqlalchemy.testing import in_ from sqlalchemy.testing import is_true from sqlalchemy.testing import mock +from sqlalchemy.testing import not_in_ +from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import Table class DeprecationWarningsTest(fixtures.TestBase): @@ -730,7 +735,7 @@ class TextTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, @@ -993,7 +998,7 @@ class TextualSelectTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, @@ -1124,3 +1129,495 @@ class DeprecatedAppendMethTest(fixtures.TestBase, AssertsCompiledSQL): with self._expect_deprecated("Select", "from", "select_from"): stmt.append_from(t1.join(t2, t1.c.q == t2.c.q)) self.assert_compile(stmt, "SELECT t1.q FROM t1 JOIN t2 ON t1.q = t2.q") + + +class KeyTargetingTest(fixtures.TablesTest): + run_inserts = "once" + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "keyed1", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + ) + Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table("content", metadata, Column("t", String(30), key="type")) + Table("bar", metadata, Column("ctype", String(30), key="content_type")) + + if testing.requires.schemas.enabled: + Table( + "wschema", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + schema=testing.config.test_schema, + ) + + @classmethod + def insert_data(cls): + cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) + cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) + cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) + cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) + cls.tables.content.insert().execute(type="t1") + + if testing.requires.schemas.enabled: + cls.tables[ + "%s.wschema" % testing.config.test_schema + ].insert().execute(dict(b="a1", q="c1")) + + def test_column_label_overlap_fallback(self): + content, bar = self.tables.content, self.tables.bar + row = testing.db.execute( + select([content.c.type.label("content_type")]) + ).first() + + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + def test_columnclause_schema_column_one(self): + keyed2 = self.tables.keyed2 + + # this is addressed by [ticket:2932] + # ColumnClause._compare_name_for_result allows the + # columns which the statement is against to be lightweight + # cols, which results in a more liberal comparison scheme + a, b = sql.column("a"), sql.column("b") + stmt = select([a, b]).select_from(table("keyed2")) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + + def test_columnclause_schema_column_two(self): + keyed2 = self.tables.keyed2 + + a, b = sql.column("a"), sql.column("b") + stmt = select([keyed2.c.a, keyed2.c.b]) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(b, row) + + def test_columnclause_schema_column_three(self): + keyed2 = self.tables.keyed2 + + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated + + a, b = sql.column("a"), sql.column("b") + stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.b, row) + + def test_columnclause_schema_column_four(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + a, b = sql.column("keyed2_a"), sql.column("keyed2_b") + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + a, b + ) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_b, row) + + def test_columnclause_schema_column_five(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + keyed2_a=CHAR, keyed2_b=CHAR + ) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_b, row) + + +class ResultProxyTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + Table( + "addresses", + metadata, + Column( + "address_id", + Integer, + primary_key=True, + test_needs_autoincrement=True, + ), + Column("user_id", Integer, ForeignKey("users.user_id")), + Column("address", String(30)), + test_needs_acid=True, + ) + + Table( + "users2", + metadata, + Column("user_id", INT, primary_key=True), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + @classmethod + def insert_data(cls): + users = cls.tables.users + + with testing.db.connect() as conn: + conn.execute( + users.insert(), + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), + ) + + def test_column_accessor_textual_select(self): + users = self.tables.users + + # this will create column() objects inside + # the select(), these need to match on name anyway + r = testing.db.execute( + select([column("user_id"), column("user_name")]) + .select_from(table("users")) + .where(text("user_id=2")) + ).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_id], 2) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_name], "jack") + + def test_column_accessor_basic_text(self): + users = self.tables.users + + r = testing.db.execute( + text("select * from users where user_id=2") + ).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_id], 2) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_name], "jack") + + @testing.provide_metadata + def test_column_label_overlap_fallback(self): + content = Table("content", self.metadata, Column("type", String(30))) + bar = Table("bar", self.metadata, Column("content_type", String(30))) + self.metadata.create_all(testing.db) + testing.db.execute(content.insert().values(type="t1")) + + row = testing.db.execute(content.select(use_labels=True)).first() + in_(content.c.type, row) + not_in_(bar.c.content_type, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([content.c.type.label("content_type")]) + ).first() + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() + + not_in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + def test_pickled_rows(self): + users = self.tables.users + addresses = self.tables.addresses + with testing.db.connect() as conn: + conn.execute(users.delete()) + conn.execute( + users.insert(), + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, + ) + + for pickle in False, True: + for use_labels in False, True: + result = ( + users.select(use_labels=use_labels) + .order_by(users.c.user_id) + .execute() + .fetchall() + ) + + if pickle: + result = util.pickle.loads(util.pickle.dumps(result)) + + if pickle: + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][users.c.user_id], 7) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][users.c.user_name], "jack") + + if not pickle or use_labels: + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id], + ) + else: + # test with a different table. name resolution is + # causing 'user_id' to match when use_labels wasn't used. + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][addresses.c.user_id], 7) + + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.address_id], + ) + + +class PositionalTextTest(fixtures.TablesTest): + run_inserts = "once" + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "text1", + metadata, + Column("a", CHAR(2)), + Column("b", CHAR(2)), + Column("c", CHAR(2)), + Column("d", CHAR(2)), + ) + + @classmethod + def insert_data(cls): + cls.tables.text1.insert().execute( + [dict(a="a1", b="b1", c="c1", d="d1")] + ) + + def test_anon_aliased_overlapping(self): + text1 = self.tables.text1 + + c1 = text1.c.a.label(None) + c2 = text1.alias().c.a + c3 = text1.alias().c.a.label(None) + c4 = text1.c.a.label(None) + + stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) + result = testing.db.execute(stmt) + row = result.first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.a], "a1") + + def test_anon_aliased_unique(self): + text1 = self.tables.text1 + + c1 = text1.c.a.label(None) + c2 = text1.alias().c.c + c3 = text1.alias().c.b + c4 = text1.alias().c.d.label(None) + + stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) + result = testing.db.execute(stmt) + row = result.first() + + eq_(row[c1], "a1") + eq_(row[c2], "b1") + eq_(row[c3], "c1") + eq_(row[c4], "d1") + + # key fallback rules still match this to a column + # unambiguously based on its name + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.a], "a1") + + # key fallback rules still match this to a column + # unambiguously based on its name + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.d], "d1") + + # text1.c.b goes nowhere....because we hit key fallback + # but the text1.c.b doesn't derive from text1.c.c + assert_raises_message( + exc.NoSuchColumnError, + "Could not locate column in row for column 'text1.b'", + lambda: row[text1.c.b], + ) diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 36d442ed7..35353671c 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -26,6 +26,7 @@ from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row +from sqlalchemy.sql.selectable import TextualSelect from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assertions @@ -192,29 +193,16 @@ class ResultProxyTest(fixtures.TablesTest): row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - - row = testing.db.execute( - select([content.c.type.label("content_type")]) - ).first() - in_(content.c.type, row) - - not_in_(bar.c.content_type, row) - - in_(sql.column("content_type"), row) row = testing.db.execute( select([func.now().label("content_type")]) ).first() - not_in_(content.c.type, row) + not_in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - def test_pickled_rows(self): users = self.tables.users - addresses = self.tables.addresses users.insert().execute( {"user_id": 7, "user_name": "jack"}, @@ -246,26 +234,10 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(result[0].keys()), ["user_id", "user_name"]) eq_(result[0][0], 7) - eq_(result[0][users.c.user_id], 7) - eq_(result[0][users.c.user_name], "jack") - - if not pickle or use_labels: - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id], - ) - else: - # test with a different table. name resolution is - # causing 'user_id' to match when use_labels wasn't used. - eq_(result[0][addresses.c.user_id], 7) assert_raises( exc.NoSuchColumnError, lambda: result[0]["fake key"] ) - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id], - ) def test_column_error_printing(self): result = testing.db.execute(select([1])) @@ -350,11 +322,9 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_id, 2) eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) eq_(r.user_name, "jack") eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") def test_column_accessor_textual_select(self): users = self.tables.users @@ -373,11 +343,9 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_id, 2) eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) eq_(r.user_name, "jack") eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") def test_column_accessor_dotted_union(self): users = self.tables.users @@ -849,6 +817,81 @@ class ResultProxyTest(fixtures.TablesTest): set([True]), ) + def test_loose_matching_one(self): + users = self.tables.users + addresses = self.tables.addresses + + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + conn.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = conn.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.address_id AS address_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [ + users.c.user_id, + users.c.user_name, + addresses.c.address_id, + ], + positional=False, + ) + ) + row = result.first() + eq_(row[users.c.user_id], 1) + eq_(row[users.c.user_name], "john") + + def test_loose_matching_two(self): + users = self.tables.users + addresses = self.tables.addresses + + with testing.db.connect() as conn: + # MARKMARK + conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + conn.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = conn.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.user_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [users.c.user_id, users.c.user_name, addresses.c.user_id], + positional=False, + ) + ) + row = result.first() + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[users.c.user_id], + ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[addresses.c.user_id], + ) + eq_(row[users.c.user_name], "john") + def test_ambiguous_column_by_col_plus_label(self): users = self.tables.users @@ -1363,79 +1406,37 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row["keyed2_c"]) assert_raises(KeyError, lambda: row["keyed2_q"]) - def test_column_label_overlap_fallback(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute( - select([content.c.type.label("content_type")]) - ).first() - - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) - - in_(sql.column("content_type"), row) - - row = testing.db.execute( - select([func.now().label("content_type")]) - ).first() - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - - def test_column_label_overlap_fallback_2(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(content.select(use_labels=True)).first() - in_(content.c.type, row) - not_in_(bar.c.content_type, row) - not_in_(sql.column("content_type"), row) - def test_columnclause_schema_column_one(self): - keyed2 = self.tables.keyed2 - - # this is addressed by [ticket:2932] - # ColumnClause._compare_name_for_result allows the - # columns which the statement is against to be lightweight - # cols, which results in a more liberal comparison scheme + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated a, b = sql.column("a"), sql.column("b") stmt = select([a, b]).select_from(table("keyed2")) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(a, row) in_(b, row) def test_columnclause_schema_column_two(self): keyed2 = self.tables.keyed2 - a, b = sql.column("a"), sql.column("b") stmt = select([keyed2.c.a, keyed2.c.b]) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) - in_(a, row) - in_(b, row) def test_columnclause_schema_column_three(self): - keyed2 = self.tables.keyed2 - # this is also addressed by [ticket:2932] - a, b = sql.column("a"), sql.column("b") stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) - in_(a, row) - in_(b, row) in_(stmt.selected_columns.a, row) in_(stmt.selected_columns.b, row) def test_columnclause_schema_column_four(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated a, b = sql.column("keyed2_a"), sql.column("keyed2_b") stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( @@ -1443,16 +1444,12 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(a, row) in_(b, row) in_(stmt.selected_columns.keyed2_a, row) in_(stmt.selected_columns.keyed2_b, row) def test_columnclause_schema_column_five(self): - keyed2 = self.tables.keyed2 - # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( @@ -1460,8 +1457,6 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(stmt.selected_columns.keyed2_a, row) in_(stmt.selected_columns.keyed2_b, row) @@ -1577,14 +1572,6 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row[c3], "c1") eq_(row[c4], "d1") - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.a], "a1") - - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.d], "d1") - # text1.c.b goes nowhere....because we hit key fallback # but the text1.c.b doesn't derive from text1.c.c assert_raises_message( @@ -1610,10 +1597,6 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row[c3], "c1") eq_(row[c4], "d1") - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.a], "a1") - def test_anon_aliased_name_conflict(self): text1 = self.tables.text1 diff --git a/test/sql/test_text.py b/test/sql/test_text.py index bec56f2f7..6af2cffcf 100644 --- a/test/sql/test_text.py +++ b/test/sql/test_text.py @@ -427,12 +427,12 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "id": ( "id", - (t.selected_columns.id, "id", "id"), + (t.selected_columns.id, "id", "id", "id"), t.selected_columns.id.type, ), "name": ( "name", - (t.selected_columns.name, "name", "name"), + (t.selected_columns.name, "name", "name", "name"), t.selected_columns.name.type, ), }, @@ -447,12 +447,12 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "id": ( "id", - (t.selected_columns.id, "id", "id"), + (t.selected_columns.id, "id", "id", "id"), t.selected_columns.id.type, ), "name": ( "name", - (t.selected_columns.name, "name", "name"), + (t.selected_columns.name, "name", "name", "name"), t.selected_columns.name.type, ), }, @@ -474,7 +474,7 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, |
