diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/orm/test_query.py | 188 | ||||
-rw-r--r-- | test/sql/test_generative.py | 264 | ||||
-rw-r--r-- | test/sql/test_selectable.py | 7 |
3 files changed, 444 insertions, 15 deletions
diff --git a/test/orm/test_query.py b/test/orm/test_query.py index a7184fe01..3f6813138 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -14,7 +14,7 @@ from sqlalchemy.testing.schema import Table, Column import sqlalchemy as sa from sqlalchemy.testing.assertions import ( eq_, assert_raises, assert_raises_message, expect_warnings) -from sqlalchemy.testing import fixtures, AssertsCompiledSQL +from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assert_warnings from test.orm import _fixtures from sqlalchemy.orm.util import join, with_parent @@ -1236,21 +1236,23 @@ class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL): __dialect__ = 'default' run_setup_mappers = 'each' - def _fixture(self): + def _fixture(self, label=True): User, Address = self.classes("User", "Address") users, addresses = self.tables("users", "addresses") + stmt = select([func.max(addresses.c.email_address)]).\ + where(addresses.c.user_id == users.c.id).\ + correlate(users) + if label: + stmt = stmt.label("email_ad") + mapper(User, users, properties={ - "ead": column_property( - select([func.max(addresses.c.email_address)]).\ - where(addresses.c.user_id == users.c.id).\ - correlate(users).label("email_ad") - ) + "ead": column_property(stmt) }) mapper(Address, addresses) - def test_order_by_column_prop_label(self): + def test_order_by_column_prop_string(self): User, Address = self.classes("User", "Address") - self._fixture() + self._fixture(label=True) s = Session() q = s.query(User).order_by("email_ad") @@ -1263,9 +1265,169 @@ class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL): "FROM users ORDER BY email_ad" ) - def test_order_by_column_prop_attrname(self): + def test_order_by_column_prop_aliased_string(self): + User, Address = self.classes("User", "Address") + self._fixture(label=True) + + s = Session() + ua = aliased(User) + q = s.query(ua).order_by("email_ad") + + def go(): + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users_1.id) " + "AS anon_1, users_1.id AS users_1_id, " + "users_1.name AS users_1_name FROM users AS users_1 " + "ORDER BY email_ad" + ) + assert_warnings( + go, + ["Can't resolve label reference 'email_ad'"], regex=True) + + def test_order_by_column_labeled_prop_attr_aliased_one(self): + User = self.classes.User + self._fixture(label=True) + + ua = aliased(User) + s = Session() + q = s.query(ua).order_by(ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " + "users_1.id AS users_1_id, users_1.name AS users_1_name " + "FROM users AS users_1 ORDER BY anon_1" + ) + + def test_order_by_column_labeled_prop_attr_aliased_two(self): + User = self.classes.User + self._fixture(label=True) + + ua = aliased(User) + s = Session() + q = s.query(ua.ead).order_by(ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses, " + "users AS users_1 WHERE addresses.user_id = users_1.id) " + "AS anon_1 ORDER BY anon_1" + ) + + # we're also testing that the state of "ua" is OK after the + # previous call, so the batching into one test is intentional + q = s.query(ua).order_by(ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " + "users_1.id AS users_1_id, users_1.name AS users_1_name " + "FROM users AS users_1 ORDER BY anon_1" + ) + + def test_order_by_column_labeled_prop_attr_aliased_three(self): + User = self.classes.User + self._fixture(label=True) + + ua = aliased(User) + s = Session() + q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses, users WHERE addresses.user_id = users.id) " + "AS email_ad, (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses, users AS users_1 WHERE addresses.user_id = " + "users_1.id) AS anon_1 ORDER BY email_ad, anon_1" + ) + + q = s.query(User, ua).order_by(User.ead, ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users.id) AS " + "email_ad, users.id AS users_id, users.name AS users_name, " + "(SELECT max(addresses.email_address) AS max_1 FROM addresses " + "WHERE addresses.user_id = users_1.id) AS anon_1, users_1.id " + "AS users_1_id, users_1.name AS users_1_name FROM users, " + "users AS users_1 ORDER BY email_ad, anon_1" + ) + + def test_order_by_column_unlabeled_prop_attr_aliased_one(self): + User = self.classes.User + self._fixture(label=False) + + ua = aliased(User) + s = Session() + q = s.query(ua).order_by(ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " + "users_1.id AS users_1_id, users_1.name AS users_1_name " + "FROM users AS users_1 ORDER BY anon_1" + ) + + def test_order_by_column_unlabeled_prop_attr_aliased_two(self): + User = self.classes.User + self._fixture(label=False) + + ua = aliased(User) + s = Session() + q = s.query(ua.ead).order_by(ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses, " + "users AS users_1 WHERE addresses.user_id = users_1.id) " + "AS anon_1 ORDER BY anon_1" + ) + + # we're also testing that the state of "ua" is OK after the + # previous call, so the batching into one test is intentional + q = s.query(ua).order_by(ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " + "users_1.id AS users_1_id, users_1.name AS users_1_name " + "FROM users AS users_1 ORDER BY anon_1" + ) + + def test_order_by_column_unlabeled_prop_attr_aliased_three(self): + User = self.classes.User + self._fixture(label=False) + + ua = aliased(User) + s = Session() + q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses, users WHERE addresses.user_id = users.id) " + "AS anon_1, (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses, users AS users_1 " + "WHERE addresses.user_id = users_1.id) AS anon_2 " + "ORDER BY anon_1, anon_2" + ) + + q = s.query(User, ua).order_by(User.ead, ua.ead) + self.assert_compile( + q, + "SELECT (SELECT max(addresses.email_address) AS max_1 " + "FROM addresses WHERE addresses.user_id = users.id) AS " + "anon_1, users.id AS users_id, users.name AS users_name, " + "(SELECT max(addresses.email_address) AS max_1 FROM addresses " + "WHERE addresses.user_id = users_1.id) AS anon_2, users_1.id " + "AS users_1_id, users_1.name AS users_1_name FROM users, " + "users AS users_1 ORDER BY anon_1, anon_2" + ) + + def test_order_by_column_prop_attr(self): User, Address = self.classes("User", "Address") - self._fixture() + self._fixture(label=True) s = Session() q = s.query(User).order_by(User.ead) @@ -1281,9 +1443,9 @@ class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL): "FROM users ORDER BY email_ad" ) - def test_order_by_column_prop_attrname_non_present(self): + def test_order_by_column_prop_attr_non_present(self): User, Address = self.classes("User", "Address") - self._fixture() + self._fixture(label=True) s = Session() q = s.query(User).options(defer(User.ead)).order_by(User.ead) diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 1140a1180..013ba8082 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -10,7 +10,7 @@ from sqlalchemy.sql.visitors import ClauseVisitor, CloningVisitor, \ cloned_traverse, ReplacingCloningVisitor from sqlalchemy import exc from sqlalchemy.sql import util as sql_util -from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message +from sqlalchemy.testing import eq_, is_, is_not_, assert_raises, assert_raises_message A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None @@ -696,6 +696,244 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): "AS anon_1 WHERE table1.col1 = anon_1.col1)") +class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + @classmethod + def setup_class(cls): + global t1, t2 + t1 = table("table1", + column("col1"), + column("col2"), + column("col3"), + column("col4") + ) + t2 = table("table2", + column("col1"), + column("col2"), + column("col3"), + ) + + def test_traverse_memoizes_w_columns(self): + t1a = t1.alias() + adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) + + expr = select([t1a.c.col1]).label('x') + expr_adapted = adapter.traverse(expr) + is_not_(expr, expr_adapted) + is_( + adapter.columns[expr], + expr_adapted + ) + + def test_traverse_memoizes_w_itself(self): + t1a = t1.alias() + adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) + + expr = select([t1a.c.col1]).label('x') + expr_adapted = adapter.traverse(expr) + is_not_(expr, expr_adapted) + is_( + adapter.traverse(expr), + expr_adapted + ) + + def test_columns_memoizes_w_itself(self): + t1a = t1.alias() + adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) + + expr = select([t1a.c.col1]).label('x') + expr_adapted = adapter.columns[expr] + is_not_(expr, expr_adapted) + is_( + adapter.columns[expr], + expr_adapted + ) + + def test_wrapping_fallthrough(self): + t1a = t1.alias(name="t1a") + t2a = t2.alias(name="t2a") + a1 = sql_util.ColumnAdapter(t1a) + + s1 = select([t1a.c.col1, t2a.c.col1]).apply_labels().alias() + a2 = sql_util.ColumnAdapter(s1) + a3 = a2.wrap(a1) + a4 = a1.wrap(a2) + a5 = a1.chain(a2) + + # t1.c.col1 -> s1.c.t1a_col1 + + # adapted by a2 + is_( + a3.columns[t1.c.col1], s1.c.t1a_col1 + ) + is_( + a4.columns[t1.c.col1], s1.c.t1a_col1 + ) + + # chaining can't fall through because a1 grabs it + # first + is_( + a5.columns[t1.c.col1], t1a.c.col1 + ) + + # t2.c.col1 -> s1.c.t2a_col1 + + # adapted by a2 + is_( + a3.columns[t2.c.col1], s1.c.t2a_col1 + ) + is_( + a4.columns[t2.c.col1], s1.c.t2a_col1 + ) + # chaining, t2 hits s1 + is_( + a5.columns[t2.c.col1], s1.c.t2a_col1 + ) + + # t1.c.col2 -> t1a.c.col2 + + # fallthrough to a1 + is_( + a3.columns[t1.c.col2], t1a.c.col2 + ) + is_( + a4.columns[t1.c.col2], t1a.c.col2 + ) + + # chaining hits a1 + is_( + a5.columns[t1.c.col2], t1a.c.col2 + ) + + # t2.c.col2 -> t2.c.col2 + + # fallthrough to no adaption + is_( + a3.columns[t2.c.col2], t2.c.col2 + ) + is_( + a4.columns[t2.c.col2], t2.c.col2 + ) + + def test_wrapping_ordering(self): + """illustrate an example where order of wrappers matters. + + This test illustrates both the ordering being significant + as well as a scenario where multiple translations are needed + (e.g. wrapping vs. chaining). + + """ + + stmt = select([t1.c.col1, t2.c.col1]).apply_labels() + + sa = stmt.alias() + stmt2 = select([t2, sa]) + + a1 = sql_util.ColumnAdapter(stmt) + a2 = sql_util.ColumnAdapter(stmt2) + + a2_to_a1 = a2.wrap(a1) + a1_to_a2 = a1.wrap(a2) + + # when stmt2 and stmt represent the same column + # in different contexts, order of wrapping matters + + # t2.c.col1 via a2 is stmt2.c.col1; then ignored by a1 + is_( + a2_to_a1.columns[t2.c.col1], stmt2.c.col1 + ) + # t2.c.col1 via a1 is stmt.c.table2_col1; a2 then + # sends this to stmt2.c.table2_col1 + is_( + a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1 + ) + + # for mutually exclusive columns, order doesn't matter + is_( + a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1 + ) + is_( + a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1 + ) + is_( + a2_to_a1.columns[t2.c.col2], stmt2.c.col2 + ) + + + def test_wrapping_multiple(self): + """illustrate that wrapping runs both adapters""" + + t1a = t1.alias(name="t1a") + t2a = t2.alias(name="t2a") + a1 = sql_util.ColumnAdapter(t1a) + a2 = sql_util.ColumnAdapter(t2a) + a3 = a2.wrap(a1) + + stmt = select([t1.c.col1, t2.c.col2]) + + self.assert_compile( + a3.traverse(stmt), + "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a" + ) + + # chaining does too because these adapters don't share any + # columns + a4 = a2.chain(a1) + self.assert_compile( + a4.traverse(stmt), + "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a" + ) + + def test_wrapping_inclusions(self): + """test wrapping and inclusion rules together, + taking into account multiple objects with equivalent hash identity.""" + + t1a = t1.alias(name="t1a") + t2a = t2.alias(name="t2a") + a1 = sql_util.ColumnAdapter( + t1a, + include_fn=lambda col: "a1" in col._annotations) + + s1 = select([t1a, t2a]).apply_labels().alias() + a2 = sql_util.ColumnAdapter( + s1, + include_fn=lambda col: "a2" in col._annotations) + a3 = a2.wrap(a1) + + c1a1 = t1.c.col1._annotate(dict(a1=True)) + c1a2 = t1.c.col1._annotate(dict(a2=True)) + c1aa = t1.c.col1._annotate(dict(a1=True, a2=True)) + + c2a1 = t2.c.col1._annotate(dict(a1=True)) + c2a2 = t2.c.col1._annotate(dict(a2=True)) + c2aa = t2.c.col1._annotate(dict(a1=True, a2=True)) + + is_( + a3.columns[c1a1], t1a.c.col1 + ) + is_( + a3.columns[c1a2], s1.c.t1a_col1 + ) + is_( + a3.columns[c1aa], s1.c.t1a_col1 + ) + + # not covered by a1, accepted by a2 + is_( + a3.columns[c2aa], s1.c.t2a_col1 + ) + + # not covered by a1, accepted by a2 + is_( + a3.columns[c2a2], s1.c.t2a_col1 + ) + # not covered by a1, rejected by a2 + is_( + a3.columns[c2a1], c2a1 + ) + + class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -1022,7 +1260,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): assert str(e) == "a.id = a.xxx_id" b = a.alias() - e = sql_util.ClauseAdapter(b, include=set([a.c.id]), + e = sql_util.ClauseAdapter(b, include_fn=lambda x: x in set([a.c.id]), equivalents={a.c.id: set([a.c.id])} ).traverse(e) @@ -1254,6 +1492,28 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "ORDER BY anon_1, anon_2" ) + def test_label_anonymize_three(self): + t1a = t1.alias() + adapter = sql_util.ColumnAdapter( + t1a, anonymize_labels=True, + allow_label_resolve=False) + + expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None) + l1 = expr + is_(l1._order_by_label_element, l1) + eq_(l1._allow_label_resolve, True) + + expr_adapted = adapter.traverse(expr) + l2 = expr_adapted + is_(l2._order_by_label_element, l2) + eq_(l2._allow_label_resolve, False) + + l3 = adapter.traverse(expr) + is_(l3._order_by_label_element, l3) + eq_(l3._allow_label_resolve, False) + + + class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index c5736b26f..a3b2b0e93 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1724,6 +1724,13 @@ class AnnotationsTest(fixtures.TestBase): b5 = visitors.cloned_traverse(b3, {}, {'binary': visit_binary}) assert str(b5) == ":bar = table1.col2" + def test_label_accessors(self): + t1 = table('t1', column('c1')) + l1 = t1.c.c1.label(None) + is_(l1._order_by_label_element, l1) + l1a = l1._annotate({"foo": "bar"}) + is_(l1a._order_by_label_element, l1a) + def test_annotate_aliased(self): t1 = table('t1', column('c1')) s = select([(t1.c.c1 + 3).label('bat')]) |