diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/orm/test_deprecations.py | 232 | ||||
| -rw-r--r-- | test/orm/test_froms.py | 174 | ||||
| -rw-r--r-- | test/orm/test_mapper.py | 8 | ||||
| -rw-r--r-- | test/orm/test_pickled.py | 2 | ||||
| -rw-r--r-- | test/orm/test_query.py | 110 | ||||
| -rw-r--r-- | test/profiles.txt | 8 | ||||
| -rw-r--r-- | test/sql/test_compiler.py | 39 | ||||
| -rw-r--r-- | test/sql/test_deprecations.py | 505 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 179 | ||||
| -rw-r--r-- | test/sql/test_text.py | 10 |
10 files changed, 985 insertions, 282 deletions
diff --git a/test/orm/test_deprecations.py b/test/orm/test_deprecations.py index f70566c8c..76cfb8667 100644 --- a/test/orm/test_deprecations.py +++ b/test/orm/test_deprecations.py @@ -19,6 +19,7 @@ from sqlalchemy.orm import column_property from sqlalchemy.orm import comparable_property from sqlalchemy.orm import composite from sqlalchemy.orm import configure_mappers +from sqlalchemy.orm import contains_alias from sqlalchemy.orm import contains_eager from sqlalchemy.orm import create_session from sqlalchemy.orm import defer @@ -55,6 +56,7 @@ from sqlalchemy.util.compat import pypy from . import _fixtures from .inheritance import _poly_fixtures from .test_options import PathTest as OptionsPathTest +from .test_query import QueryTest from .test_transaction import _LocalFixture @@ -2976,3 +2978,233 @@ class NonPrimaryMapperTest(_fixtures.FixtureTest, AssertsCompiledSQL): addresses, non_primary=True, ) + + +class InstancesTest(QueryTest, AssertsCompiledSQL): + def test_from_alias_one(self): + User, addresses, users = ( + self.classes.User, + self.tables.addresses, + self.tables.users, + ) + + query = ( + users.select(users.c.id == 7) + .union(users.select(users.c.id > 7)) + .alias("ulist") + .outerjoin(addresses) + .select( + use_labels=True, order_by=[text("ulist.id"), addresses.c.id] + ) + ) + sess = create_session() + q = sess.query(User) + + # note this has multiple problems because we aren't giving Query + # the statement where it would be able to create an adapter + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context", + "Retreiving row values using Column objects with only " + "matching names", + ): + result = list( + q.options( + contains_alias("ulist"), contains_eager("addresses") + ).instances(query.execute()) + ) + assert self.static.user_address_result == result + + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager(self): + users, addresses, User = ( + self.tables.users, + self.tables.addresses, + self.classes.User, + ) + + sess = create_session() + + selectquery = users.outerjoin(addresses).select( + users.c.id < 10, + use_labels=True, + order_by=[users.c.id, addresses.c.id], + ) + q = sess.query(User) + + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context" + ): + result = list( + q.options(contains_eager("addresses")).instances( + selectquery.execute() + ) + ) + assert self.static.user_address_result[0:3] == result + + self.assert_sql_count(testing.db, go, 1) + + sess.expunge_all() + + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context" + ): + result = list( + q.options(contains_eager(User.addresses)).instances( + selectquery.execute() + ) + ) + assert self.static.user_address_result[0:3] == result + + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager_string_alias(self): + addresses, users, User = ( + self.tables.addresses, + self.tables.users, + self.classes.User, + ) + + sess = create_session() + q = sess.query(User) + + adalias = addresses.alias("adalias") + selectquery = users.outerjoin(adalias).select( + use_labels=True, order_by=[users.c.id, adalias.c.id] + ) + + # note this has multiple problems because we aren't giving Query + # the statement where it would be able to create an adapter + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context", + r"Passing a string name for the 'alias' argument to " + r"'contains_eager\(\)` is deprecated", + "Retreiving row values using Column objects with only " + "matching names", + ): + result = list( + q.options( + contains_eager("addresses", alias="adalias") + ).instances(selectquery.execute()) + ) + assert self.static.user_address_result == result + + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager_aliased_instances(self): + addresses, users, User = ( + self.tables.addresses, + self.tables.users, + self.classes.User, + ) + + sess = create_session() + q = sess.query(User) + + adalias = addresses.alias("adalias") + selectquery = users.outerjoin(adalias).select( + use_labels=True, order_by=[users.c.id, adalias.c.id] + ) + + # note this has multiple problems because we aren't giving Query + # the statement where it would be able to create an adapter + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context" + ): + result = list( + q.options( + contains_eager("addresses", alias=adalias) + ).instances(selectquery.execute()) + ) + assert self.static.user_address_result == result + + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager_multi_string_alias(self): + orders, items, users, order_items, User = ( + self.tables.orders, + self.tables.items, + self.tables.users, + self.tables.order_items, + self.classes.User, + ) + + sess = create_session() + q = sess.query(User) + + oalias = orders.alias("o1") + ialias = items.alias("i1") + query = ( + users.outerjoin(oalias) + .outerjoin(order_items) + .outerjoin(ialias) + .select(use_labels=True) + .order_by(users.c.id, oalias.c.id, ialias.c.id) + ) + + # test using string alias with more than one level deep + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context", + r"Passing a string name for the 'alias' argument to " + r"'contains_eager\(\)` is deprecated", + "Retreiving row values using Column objects with only " + "matching names", + ): + result = list( + q.options( + contains_eager("orders", alias="o1"), + contains_eager("orders.items", alias="i1"), + ).instances(query.execute()) + ) + assert self.static.user_order_result == result + + self.assert_sql_count(testing.db, go, 1) + + def test_contains_eager_multi_alias(self): + orders, items, users, order_items, User = ( + self.tables.orders, + self.tables.items, + self.tables.users, + self.tables.order_items, + self.classes.User, + ) + + sess = create_session() + q = sess.query(User) + + oalias = orders.alias("o1") + ialias = items.alias("i1") + query = ( + users.outerjoin(oalias) + .outerjoin(order_items) + .outerjoin(ialias) + .select(use_labels=True) + .order_by(users.c.id, oalias.c.id, ialias.c.id) + ) + + # test using Alias with more than one level deep + + # new way: + # from sqlalchemy.orm.strategy_options import Load + # opt = Load(User).contains_eager('orders', alias=oalias). + # contains_eager('items', alias=ialias) + + def go(): + with testing.expect_deprecated( + r"Using the Query.instances\(\) method without a context" + ): + result = list( + q.options( + contains_eager("orders", alias=oalias), + contains_eager("orders.items", alias=ialias), + ).instances(query.execute()) + ) + assert self.static.user_order_result == result + + self.assert_sql_count(testing.db, go, 1) diff --git a/test/orm/test_froms.py b/test/orm/test_froms.py index efa45affa..736e27c14 100644 --- a/test/orm/test_froms.py +++ b/test/orm/test_froms.py @@ -852,35 +852,6 @@ class AddEntityEquivalenceTest(fixtures.MappedTest, AssertsCompiledSQL): class InstancesTest(QueryTest, AssertsCompiledSQL): - def test_from_alias_one(self): - User, addresses, users = ( - self.classes.User, - self.tables.addresses, - self.tables.users, - ) - - query = ( - users.select(users.c.id == 7) - .union(users.select(users.c.id > 7)) - .alias("ulist") - .outerjoin(addresses) - .select( - use_labels=True, order_by=[text("ulist.id"), addresses.c.id] - ) - ) - sess = create_session() - q = sess.query(User) - - def go(): - result = list( - q.options( - contains_alias("ulist"), contains_eager("addresses") - ).instances(query.execute()) - ) - assert self.static.user_address_result == result - - self.assert_sql_count(testing.db, go, 1) - def test_from_alias_two(self): User, addresses, users = ( self.classes.User, @@ -972,12 +943,8 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.assert_sql_count(testing.db, go, 1) - def test_contains_eager(self): - users, addresses, User = ( - self.tables.users, - self.tables.addresses, - self.classes.User, - ) + def test_contains_eager_one(self): + addresses, User = (self.tables.addresses, self.classes.User) sess = create_session() @@ -1005,7 +972,15 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): assert self.static.user_address_result == q.all() self.assert_sql_count(testing.db, go, 1) - sess.expunge_all() + + def test_contains_eager_two(self): + users, addresses, User = ( + self.tables.users, + self.tables.addresses, + self.classes.User, + ) + + sess = create_session() adalias = addresses.alias() q = ( @@ -1019,37 +994,23 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): eq_(self.static.user_address_result, q.all()) self.assert_sql_count(testing.db, go, 1) - sess.expunge_all() + + def test_contains_eager_four(self): + users, addresses, User = ( + self.tables.users, + self.tables.addresses, + self.classes.User, + ) + + sess = create_session() selectquery = users.outerjoin(addresses).select( users.c.id < 10, use_labels=True, order_by=[users.c.id, addresses.c.id], ) - q = sess.query(User) - - def go(): - result = list( - q.options(contains_eager("addresses")).instances( - selectquery.execute() - ) - ) - assert self.static.user_address_result[0:3] == result - self.assert_sql_count(testing.db, go, 1) - - sess.expunge_all() - - def go(): - result = list( - q.options(contains_eager(User.addresses)).instances( - selectquery.execute() - ) - ) - assert self.static.user_address_result[0:3] == result - - self.assert_sql_count(testing.db, go, 1) - sess.expunge_all() + q = sess.query(User) def go(): result = ( @@ -1061,58 +1022,6 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.assert_sql_count(testing.db, go, 1) - def test_contains_eager_string_alias(self): - addresses, users, User = ( - self.tables.addresses, - self.tables.users, - self.classes.User, - ) - - sess = create_session() - q = sess.query(User) - - adalias = addresses.alias("adalias") - selectquery = users.outerjoin(adalias).select( - use_labels=True, order_by=[users.c.id, adalias.c.id] - ) - - # string alias name - def go(): - result = list( - q.options( - contains_eager("addresses", alias="adalias") - ).instances(selectquery.execute()) - ) - assert self.static.user_address_result == result - - self.assert_sql_count(testing.db, go, 1) - - def test_contains_eager_aliased_instances(self): - addresses, users, User = ( - self.tables.addresses, - self.tables.users, - self.classes.User, - ) - - sess = create_session() - q = sess.query(User) - - adalias = addresses.alias("adalias") - selectquery = users.outerjoin(adalias).select( - use_labels=True, order_by=[users.c.id, adalias.c.id] - ) - - # expression.Alias object - def go(): - result = list( - q.options( - contains_eager("addresses", alias=adalias) - ).instances(selectquery.execute()) - ) - assert self.static.user_address_result == result - - self.assert_sql_count(testing.db, go, 1) - def test_contains_eager_aliased(self): User, Address = self.classes.User, self.classes.Address @@ -1132,40 +1041,6 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.assert_sql_count(testing.db, go, 1) - def test_contains_eager_multi_string_alias(self): - orders, items, users, order_items, User = ( - self.tables.orders, - self.tables.items, - self.tables.users, - self.tables.order_items, - self.classes.User, - ) - - sess = create_session() - q = sess.query(User) - - oalias = orders.alias("o1") - ialias = items.alias("i1") - query = ( - users.outerjoin(oalias) - .outerjoin(order_items) - .outerjoin(ialias) - .select(use_labels=True) - .order_by(users.c.id, oalias.c.id, ialias.c.id) - ) - - # test using string alias with more than one level deep - def go(): - result = list( - q.options( - contains_eager("orders", alias="o1"), - contains_eager("orders.items", alias="i1"), - ).instances(query.execute()) - ) - assert self.static.user_order_result == result - - self.assert_sql_count(testing.db, go, 1) - def test_contains_eager_multi_alias(self): orders, items, users, order_items, User = ( self.tables.orders, @@ -1200,7 +1075,7 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): q.options( contains_eager("orders", alias=oalias), contains_eager("orders.items", alias=ialias), - ).instances(query.execute()) + ).from_statement(query) ) assert self.static.user_order_result == result @@ -2098,7 +1973,7 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): use_labels=True, order_by=[users.c.id, addresses.c.id] ) eq_( - list(sess.query(User, Address).instances(selectquery.execute())), + list(sess.query(User, Address).from_statement(selectquery)), expected, ) sess.expunge_all() @@ -2262,8 +2137,9 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sess.expunge_all() # TODO: figure out why group_by(users) doesn't work here + count = func.count(addresses.c.id).label("count") s = ( - select([users, func.count(addresses.c.id).label("count")]) + select([users, count]) .select_from(users.outerjoin(addresses)) .group_by(*[c for c in users.c]) .order_by(User.id) diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py index 93346b32f..2622ea640 100644 --- a/test/orm/test_mapper.py +++ b/test/orm/test_mapper.py @@ -2256,8 +2256,10 @@ class OptionsTest(_fixtures.FixtureTest): # session's identity map) r = users.select().order_by(users.c.id).execute() + ctx = sess.query(User)._compile_context() + def go(): - result = list(sess.query(User).instances(r)) + result = list(sess.query(User).instances(r, ctx)) eq_(result, self.static.user_address_result) self.sql_count_(4, go) @@ -2353,8 +2355,10 @@ class OptionsTest(_fixtures.FixtureTest): # then assert the data, which will launch 6 more lazy loads r = users.select().execute() + ctx = sess.query(User)._compile_context() + def go(): - result = list(sess.query(User).instances(r)) + result = list(sess.query(User).instances(r, ctx)) eq_(result, self.static.user_all_result) self.assert_sql_count(testing.db, go, 6) diff --git a/test/orm/test_pickled.py b/test/orm/test_pickled.py index 399c881ac..72a68c42b 100644 --- a/test/orm/test_pickled.py +++ b/test/orm/test_pickled.py @@ -793,8 +793,6 @@ class TupleLabelTest(_fixtures.FixtureTest): eq_(row.User, row[0]) eq_(row.orders, row[1]) - # test here that first col is not labeled, only - # one name in keys, matches correctly for row in sess.query(User.name + "hoho", User.name): eq_(list(row.keys()), ["name"]) eq_(row[0], row.name + "hoho") diff --git a/test/orm/test_query.py b/test/orm/test_query.py index bcd13e6e2..7c4811dc7 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -36,6 +36,7 @@ from sqlalchemy.orm import attributes from sqlalchemy.orm import backref from sqlalchemy.orm import Bundle from sqlalchemy.orm import column_property +from sqlalchemy.orm import contains_eager from sqlalchemy.orm import create_session from sqlalchemy.orm import defer from sqlalchemy.orm import joinedload @@ -63,6 +64,7 @@ from sqlalchemy.testing.assertions import expect_warnings from sqlalchemy.testing.assertsql import CompiledSQL from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table +from sqlalchemy.util import collections_abc from test.orm import _fixtures @@ -99,7 +101,7 @@ class OnlyReturnTuplesTest(QueryTest): def test_single_entity_true(self): User = self.classes.User row = create_session().query(User).only_return_tuples(True).first() - assert isinstance(row, tuple) + assert isinstance(row, collections_abc.Sequence) def test_multiple_entity_false(self): User = self.classes.User @@ -109,7 +111,7 @@ class OnlyReturnTuplesTest(QueryTest): .only_return_tuples(False) .first() ) - assert isinstance(row, tuple) + assert isinstance(row, collections_abc.Sequence) def test_multiple_entity_true(self): User = self.classes.User @@ -119,7 +121,7 @@ class OnlyReturnTuplesTest(QueryTest): .only_return_tuples(True) .first() ) - assert isinstance(row, tuple) + assert isinstance(row, collections_abc.Sequence) class RowTupleTest(QueryTest): @@ -1762,7 +1764,9 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): ) result = list( - session.query(User).instances(s.execute(emailad="jack@bean.com")) + session.query(User) + .params(emailad="jack@bean.com") + .from_statement(s) ) eq_([User(id=7)], result) @@ -4168,7 +4172,7 @@ class HintsTest(QueryTest, AssertsCompiledSQL): class TextTest(QueryTest, AssertsCompiledSQL): __dialect__ = "default" - def test_fulltext(self): + def test_needs_text(self): User = self.classes.User assert_raises_message( @@ -4178,6 +4182,9 @@ class TextTest(QueryTest, AssertsCompiledSQL): "select * from users order by id", ) + def test_select_star(self): + User = self.classes.User + eq_( create_session() .query(User) @@ -4195,6 +4202,97 @@ class TextTest(QueryTest, AssertsCompiledSQL): None, ) + def test_columns_mismatched(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter + User = self.classes.User + + s = create_session() + q = s.query(User).from_statement( + text( + "select name, 27 as foo, id as users_id from users order by id" + ) + ) + eq_( + q.all(), + [ + User(id=7, name="jack"), + User(id=8, name="ed"), + User(id=9, name="fred"), + User(id=10, name="chuck"), + ], + ) + + def test_columns_multi_table_uselabels(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = create_session() + q = s.query(User, Address).from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ) + ) + + eq_( + q.all(), + [ + (User(id=8), Address(id=2)), + (User(id=8), Address(id=3)), + (User(id=8), Address(id=4)), + ], + ) + + def test_columns_multi_table_uselabels_contains_eager(self): + # test that columns using column._label match, as well as that + # ordering doesn't matter. + User = self.classes.User + Address = self.classes.Address + + s = create_session() + q = ( + s.query(User) + .from_statement( + text( + "select users.name AS users_name, users.id AS users_id, " + "addresses.id AS addresses_id FROM users JOIN addresses " + "ON users.id = addresses.user_id WHERE users.id=8 " + "ORDER BY addresses.id" + ) + ) + .options(contains_eager(User.addresses)) + ) + + def go(): + r = q.all() + eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)]) + + self.assert_sql_count(testing.db, go, 1) + + def test_other_eager_loads(self): + # this is new in 1.4. with textclause, we build up column loaders + # normally, so that eager loaders also get installed. previously, + # _compile_context() didn't build up column loaders and attempted + # to get them after the fact. + User = self.classes.User + + s = create_session() + q = ( + s.query(User) + .from_statement(text("select * from users ORDER BY users.id")) + .options(subqueryload(User.addresses)) + ) + + def go(): + eq_(q.all(), self.static.user_address_result) + + self.assert_sql_count(testing.db, go, 2) + def test_whereclause(self): User = self.classes.User @@ -4231,7 +4329,7 @@ class TextTest(QueryTest, AssertsCompiledSQL): "id in (:id1, :id2)", ) - def test_as_column(self): + def test_plain_textual_column(self): User = self.classes.User s = create_session() diff --git a/test/profiles.txt b/test/profiles.txt index 980b47634..721ce9677 100644 --- a/test/profiles.txt +++ b/test/profiles.txt @@ -999,10 +999,10 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.7_sqlite_pysqlite # TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation -test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_cextensions 6412,322,3969,13151,1340,2187,2770 -test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 6429,322,3969,13149,1341,2187,2766 -test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_cextensions 6177,306,3889,12597,1233,2133,2650 -test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_nocextensions 6260,306,3969,13203,1344,2151,2840 +test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_cextensions 6412,322,4242,13151,1340,2187,2770 +test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 6429,322,4242,13149,1341,2187,2766 +test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_cextensions 6177,306,4162,12597,1233,2133,2650 +test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_nocextensions 6260,306,4242,13203,1344,2151,2840 # TEST: test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index cda22a5ac..36e9cd33b 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -4466,8 +4466,8 @@ class ResultMapTest(fixtures.TestBase): eq_( comp._create_result_map(), { - "a": ("a", (t.c.a, "a", "a"), t.c.a.type), - "b": ("b", (t.c.b, "b", "b"), t.c.b.type), + "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type), + "b": ("b", (t.c.b, "b", "b", "t_b"), t.c.b.type), }, ) @@ -4478,7 +4478,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp._create_result_map(), - {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)}, ) def test_compound_only_top_populates(self): @@ -4487,7 +4487,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp._create_result_map(), - {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)}, ) def test_label_plus_element(self): @@ -4500,7 +4500,7 @@ class ResultMapTest(fixtures.TestBase): eq_( comp._create_result_map(), { - "a": ("a", (t.c.a, "a", "a"), t.c.a.type), + "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type), "bar": ("bar", (l1, "bar"), l1.type), "anon_1": ( tc.anon_label, @@ -4541,7 +4541,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {"a": ("a", (aint, "a", "a"), aint.type)}, + {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)}, ) def test_insert_from_select(self): @@ -4557,7 +4557,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {"a": ("a", (aint, "a", "a"), aint.type)}, + {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)}, ) def test_nested_api(self): @@ -4596,12 +4596,22 @@ class ResultMapTest(fixtures.TestBase): { "otherid": ( "otherid", - (table2.c.otherid, "otherid", "otherid"), + ( + table2.c.otherid, + "otherid", + "otherid", + "myothertable_otherid", + ), table2.c.otherid.type, ), "othername": ( "othername", - (table2.c.othername, "othername", "othername"), + ( + table2.c.othername, + "othername", + "othername", + "myothertable_othername", + ), table2.c.othername.type, ), "k1": ("k1", (1, 2, 3), int_), @@ -4612,18 +4622,23 @@ class ResultMapTest(fixtures.TestBase): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ), "k2": ("k2", (3, 4, 5), int_), "name": ( "name", - (table1.c.name, "name", "name"), + (table1.c.name, "name", "name", "mytable_name"), table1.c.name.type, ), "description": ( "description", - (table1.c.description, "description", "description"), + ( + table1.c.description, + "description", + "description", + "mytable_description", + ), table1.c.description.type, ), }, diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py index a75de3f11..bac0a7413 100644 --- a/test/sql/test_deprecations.py +++ b/test/sql/test_deprecations.py @@ -2,12 +2,13 @@ from sqlalchemy import alias from sqlalchemy import bindparam -from sqlalchemy import Column +from sqlalchemy import CHAR from sqlalchemy import column from sqlalchemy import create_engine from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func +from sqlalchemy import INT from sqlalchemy import Integer from sqlalchemy import join from sqlalchemy import literal_column @@ -16,11 +17,11 @@ from sqlalchemy import null from sqlalchemy import select from sqlalchemy import sql from sqlalchemy import String -from sqlalchemy import Table from sqlalchemy import table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import util +from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.schema import DDL from sqlalchemy.sql import coercions @@ -35,8 +36,12 @@ from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures +from sqlalchemy.testing import in_ from sqlalchemy.testing import is_true from sqlalchemy.testing import mock +from sqlalchemy.testing import not_in_ +from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import Table class DeprecationWarningsTest(fixtures.TestBase): @@ -730,7 +735,7 @@ class TextTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, @@ -993,7 +998,7 @@ class TextualSelectTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, @@ -1124,3 +1129,495 @@ class DeprecatedAppendMethTest(fixtures.TestBase, AssertsCompiledSQL): with self._expect_deprecated("Select", "from", "select_from"): stmt.append_from(t1.join(t2, t1.c.q == t2.c.q)) self.assert_compile(stmt, "SELECT t1.q FROM t1 JOIN t2 ON t1.q = t2.q") + + +class KeyTargetingTest(fixtures.TablesTest): + run_inserts = "once" + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "keyed1", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + ) + Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table("content", metadata, Column("t", String(30), key="type")) + Table("bar", metadata, Column("ctype", String(30), key="content_type")) + + if testing.requires.schemas.enabled: + Table( + "wschema", + metadata, + Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q"), + schema=testing.config.test_schema, + ) + + @classmethod + def insert_data(cls): + cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) + cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) + cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) + cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) + cls.tables.content.insert().execute(type="t1") + + if testing.requires.schemas.enabled: + cls.tables[ + "%s.wschema" % testing.config.test_schema + ].insert().execute(dict(b="a1", q="c1")) + + def test_column_label_overlap_fallback(self): + content, bar = self.tables.content, self.tables.bar + row = testing.db.execute( + select([content.c.type.label("content_type")]) + ).first() + + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() + not_in_(content.c.type, row) + not_in_(bar.c.content_type, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + def test_columnclause_schema_column_one(self): + keyed2 = self.tables.keyed2 + + # this is addressed by [ticket:2932] + # ColumnClause._compare_name_for_result allows the + # columns which the statement is against to be lightweight + # cols, which results in a more liberal comparison scheme + a, b = sql.column("a"), sql.column("b") + stmt = select([a, b]).select_from(table("keyed2")) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + + def test_columnclause_schema_column_two(self): + keyed2 = self.tables.keyed2 + + a, b = sql.column("a"), sql.column("b") + stmt = select([keyed2.c.a, keyed2.c.b]) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(b, row) + + def test_columnclause_schema_column_three(self): + keyed2 = self.tables.keyed2 + + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated + + a, b = sql.column("a"), sql.column("b") + stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.b, row) + + def test_columnclause_schema_column_four(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + a, b = sql.column("keyed2_a"), sql.column("keyed2_b") + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + a, b + ) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_b, row) + + def test_columnclause_schema_column_five(self): + keyed2 = self.tables.keyed2 + + # this is also addressed by [ticket:2932] + + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + keyed2_a=CHAR, keyed2_b=CHAR + ) + row = testing.db.execute(stmt).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(keyed2.c.b, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_a, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names", + "The SelectBase.c and SelectBase.columns", + ): + in_(stmt.c.keyed2_b, row) + + +class ResultProxyTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + Table( + "addresses", + metadata, + Column( + "address_id", + Integer, + primary_key=True, + test_needs_autoincrement=True, + ), + Column("user_id", Integer, ForeignKey("users.user_id")), + Column("address", String(30)), + test_needs_acid=True, + ) + + Table( + "users2", + metadata, + Column("user_id", INT, primary_key=True), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + @classmethod + def insert_data(cls): + users = cls.tables.users + + with testing.db.connect() as conn: + conn.execute( + users.insert(), + dict(user_id=1, user_name="john"), + dict(user_id=2, user_name="jack"), + ) + + def test_column_accessor_textual_select(self): + users = self.tables.users + + # this will create column() objects inside + # the select(), these need to match on name anyway + r = testing.db.execute( + select([column("user_id"), column("user_name")]) + .select_from(table("users")) + .where(text("user_id=2")) + ).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_id], 2) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_name], "jack") + + def test_column_accessor_basic_text(self): + users = self.tables.users + + r = testing.db.execute( + text("select * from users where user_id=2") + ).first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_id], 2) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(r[users.c.user_name], "jack") + + @testing.provide_metadata + def test_column_label_overlap_fallback(self): + content = Table("content", self.metadata, Column("type", String(30))) + bar = Table("bar", self.metadata, Column("content_type", String(30))) + self.metadata.create_all(testing.db) + testing.db.execute(content.insert().values(type="t1")) + + row = testing.db.execute(content.select(use_labels=True)).first() + in_(content.c.type, row) + not_in_(bar.c.content_type, row) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([content.c.type.label("content_type")]) + ).first() + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + row = testing.db.execute( + select([func.now().label("content_type")]) + ).first() + + not_in_(content.c.type, row) + + not_in_(bar.c.content_type, row) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + in_(sql.column("content_type"), row) + + def test_pickled_rows(self): + users = self.tables.users + addresses = self.tables.addresses + with testing.db.connect() as conn: + conn.execute(users.delete()) + conn.execute( + users.insert(), + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": "fred"}, + ) + + for pickle in False, True: + for use_labels in False, True: + result = ( + users.select(use_labels=use_labels) + .order_by(users.c.user_id) + .execute() + .fetchall() + ) + + if pickle: + result = util.pickle.loads(util.pickle.dumps(result)) + + if pickle: + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][users.c.user_id], 7) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][users.c.user_name], "jack") + + if not pickle or use_labels: + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id], + ) + else: + # test with a different table. name resolution is + # causing 'user_id' to match when use_labels wasn't used. + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0][addresses.c.user_id], 7) + + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.address_id], + ) + + +class PositionalTextTest(fixtures.TablesTest): + run_inserts = "once" + run_deletes = None + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "text1", + metadata, + Column("a", CHAR(2)), + Column("b", CHAR(2)), + Column("c", CHAR(2)), + Column("d", CHAR(2)), + ) + + @classmethod + def insert_data(cls): + cls.tables.text1.insert().execute( + [dict(a="a1", b="b1", c="c1", d="d1")] + ) + + def test_anon_aliased_overlapping(self): + text1 = self.tables.text1 + + c1 = text1.c.a.label(None) + c2 = text1.alias().c.a + c3 = text1.alias().c.a.label(None) + c4 = text1.c.a.label(None) + + stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) + result = testing.db.execute(stmt) + row = result.first() + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.a], "a1") + + def test_anon_aliased_unique(self): + text1 = self.tables.text1 + + c1 = text1.c.a.label(None) + c2 = text1.alias().c.c + c3 = text1.alias().c.b + c4 = text1.alias().c.d.label(None) + + stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) + result = testing.db.execute(stmt) + row = result.first() + + eq_(row[c1], "a1") + eq_(row[c2], "b1") + eq_(row[c3], "c1") + eq_(row[c4], "d1") + + # key fallback rules still match this to a column + # unambiguously based on its name + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.a], "a1") + + # key fallback rules still match this to a column + # unambiguously based on its name + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "with only matching names" + ): + eq_(row[text1.c.d], "d1") + + # text1.c.b goes nowhere....because we hit key fallback + # but the text1.c.b doesn't derive from text1.c.c + assert_raises_message( + exc.NoSuchColumnError, + "Could not locate column in row for column 'text1.b'", + lambda: row[text1.c.b], + ) diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 36d442ed7..35353671c 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -26,6 +26,7 @@ from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row +from sqlalchemy.sql.selectable import TextualSelect from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import assertions @@ -192,29 +193,16 @@ class ResultProxyTest(fixtures.TablesTest): row = testing.db.execute(content.select(use_labels=True)).first() in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - - row = testing.db.execute( - select([content.c.type.label("content_type")]) - ).first() - in_(content.c.type, row) - - not_in_(bar.c.content_type, row) - - in_(sql.column("content_type"), row) row = testing.db.execute( select([func.now().label("content_type")]) ).first() - not_in_(content.c.type, row) + not_in_(content.c.type, row) not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - def test_pickled_rows(self): users = self.tables.users - addresses = self.tables.addresses users.insert().execute( {"user_id": 7, "user_name": "jack"}, @@ -246,26 +234,10 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(result[0].keys()), ["user_id", "user_name"]) eq_(result[0][0], 7) - eq_(result[0][users.c.user_id], 7) - eq_(result[0][users.c.user_name], "jack") - - if not pickle or use_labels: - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id], - ) - else: - # test with a different table. name resolution is - # causing 'user_id' to match when use_labels wasn't used. - eq_(result[0][addresses.c.user_id], 7) assert_raises( exc.NoSuchColumnError, lambda: result[0]["fake key"] ) - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id], - ) def test_column_error_printing(self): result = testing.db.execute(select([1])) @@ -350,11 +322,9 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_id, 2) eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) eq_(r.user_name, "jack") eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") def test_column_accessor_textual_select(self): users = self.tables.users @@ -373,11 +343,9 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_id, 2) eq_(r["user_id"], 2) - eq_(r[users.c.user_id], 2) eq_(r.user_name, "jack") eq_(r["user_name"], "jack") - eq_(r[users.c.user_name], "jack") def test_column_accessor_dotted_union(self): users = self.tables.users @@ -849,6 +817,81 @@ class ResultProxyTest(fixtures.TablesTest): set([True]), ) + def test_loose_matching_one(self): + users = self.tables.users + addresses = self.tables.addresses + + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + conn.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = conn.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.address_id AS address_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [ + users.c.user_id, + users.c.user_name, + addresses.c.address_id, + ], + positional=False, + ) + ) + row = result.first() + eq_(row[users.c.user_id], 1) + eq_(row[users.c.user_name], "john") + + def test_loose_matching_two(self): + users = self.tables.users + addresses = self.tables.addresses + + with testing.db.connect() as conn: + # MARKMARK + conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + conn.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = conn.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.user_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [users.c.user_id, users.c.user_name, addresses.c.user_id], + positional=False, + ) + ) + row = result.first() + + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[users.c.user_id], + ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row[addresses.c.user_id], + ) + eq_(row[users.c.user_name], "john") + def test_ambiguous_column_by_col_plus_label(self): users = self.tables.users @@ -1363,79 +1406,37 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row["keyed2_c"]) assert_raises(KeyError, lambda: row["keyed2_q"]) - def test_column_label_overlap_fallback(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute( - select([content.c.type.label("content_type")]) - ).first() - - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) - - in_(sql.column("content_type"), row) - - row = testing.db.execute( - select([func.now().label("content_type")]) - ).first() - not_in_(content.c.type, row) - not_in_(bar.c.content_type, row) - in_(sql.column("content_type"), row) - - def test_column_label_overlap_fallback_2(self): - content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(content.select(use_labels=True)).first() - in_(content.c.type, row) - not_in_(bar.c.content_type, row) - not_in_(sql.column("content_type"), row) - def test_columnclause_schema_column_one(self): - keyed2 = self.tables.keyed2 - - # this is addressed by [ticket:2932] - # ColumnClause._compare_name_for_result allows the - # columns which the statement is against to be lightweight - # cols, which results in a more liberal comparison scheme + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated a, b = sql.column("a"), sql.column("b") stmt = select([a, b]).select_from(table("keyed2")) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(a, row) in_(b, row) def test_columnclause_schema_column_two(self): keyed2 = self.tables.keyed2 - a, b = sql.column("a"), sql.column("b") stmt = select([keyed2.c.a, keyed2.c.b]) row = testing.db.execute(stmt).first() in_(keyed2.c.a, row) in_(keyed2.c.b, row) - in_(a, row) - in_(b, row) def test_columnclause_schema_column_three(self): - keyed2 = self.tables.keyed2 - # this is also addressed by [ticket:2932] - a, b = sql.column("a"), sql.column("b") stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) - in_(a, row) - in_(b, row) in_(stmt.selected_columns.a, row) in_(stmt.selected_columns.b, row) def test_columnclause_schema_column_four(self): - keyed2 = self.tables.keyed2 - - # this is also addressed by [ticket:2932] + # originally addressed by [ticket:2932], however liberalized + # Column-targeting rules are deprecated a, b = sql.column("keyed2_a"), sql.column("keyed2_b") stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( @@ -1443,16 +1444,12 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(a, row) in_(b, row) in_(stmt.selected_columns.keyed2_a, row) in_(stmt.selected_columns.keyed2_b, row) def test_columnclause_schema_column_five(self): - keyed2 = self.tables.keyed2 - # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( @@ -1460,8 +1457,6 @@ class KeyTargetingTest(fixtures.TablesTest): ) row = testing.db.execute(stmt).first() - in_(keyed2.c.a, row) - in_(keyed2.c.b, row) in_(stmt.selected_columns.keyed2_a, row) in_(stmt.selected_columns.keyed2_b, row) @@ -1577,14 +1572,6 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row[c3], "c1") eq_(row[c4], "d1") - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.a], "a1") - - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.d], "d1") - # text1.c.b goes nowhere....because we hit key fallback # but the text1.c.b doesn't derive from text1.c.c assert_raises_message( @@ -1610,10 +1597,6 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row[c3], "c1") eq_(row[c4], "d1") - # key fallback rules still match this to a column - # unambiguously based on its name - eq_(row[text1.c.a], "a1") - def test_anon_aliased_name_conflict(self): text1 = self.tables.text1 diff --git a/test/sql/test_text.py b/test/sql/test_text.py index bec56f2f7..6af2cffcf 100644 --- a/test/sql/test_text.py +++ b/test/sql/test_text.py @@ -427,12 +427,12 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "id": ( "id", - (t.selected_columns.id, "id", "id"), + (t.selected_columns.id, "id", "id", "id"), t.selected_columns.id.type, ), "name": ( "name", - (t.selected_columns.name, "name", "name"), + (t.selected_columns.name, "name", "name", "name"), t.selected_columns.name.type, ), }, @@ -447,12 +447,12 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "id": ( "id", - (t.selected_columns.id, "id", "id"), + (t.selected_columns.id, "id", "id", "id"), t.selected_columns.id.type, ), "name": ( "name", - (t.selected_columns.name, "name", "name"), + (t.selected_columns.name, "name", "name", "name"), t.selected_columns.name.type, ), }, @@ -474,7 +474,7 @@ class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): { "myid": ( "myid", - (table1.c.myid, "myid", "myid"), + (table1.c.myid, "myid", "myid", "mytable_myid"), table1.c.myid.type, ) }, |
