diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-12-03 17:28:36 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-12-03 17:28:36 +0000 |
| commit | 0410eae36b36dc8ea7e747c4b81c7ec9de5f2da4 (patch) | |
| tree | 86a53b2b2c33a3348d6aca9c7bc825315816748f /test | |
| parent | 851a14aa1a0e7a734a6f810f0e6e5c39d8e63b1b (diff) | |
| download | sqlalchemy-0410eae36b36dc8ea7e747c4b81c7ec9de5f2da4.tar.gz | |
- Two fixes to help prevent out-of-band columns from
being rendered in polymorphic_union inheritance
scenarios (which then causes extra tables to be
rendered in the FROM clause causing cartesian
products):
- improvements to "column adaption" for
a->b->c inheritance situations to better
locate columns that are related to one
another via multiple levels of indirection,
rather than rendering the non-adapted
column.
- the "polymorphic discriminator" column is
only rendered for the actual mapper being
queried against. The column won't be
"pulled in" from a subclass or superclass
mapper since it's not needed.
Diffstat (limited to 'test')
| -rw-r--r-- | test/orm/inheritance/concrete.py | 96 | ||||
| -rw-r--r-- | test/sql/generative.py | 26 |
2 files changed, 120 insertions, 2 deletions
diff --git a/test/orm/inheritance/concrete.py b/test/orm/inheritance/concrete.py index b8d799b60..e6277f3e9 100644 --- a/test/orm/inheritance/concrete.py +++ b/test/orm/inheritance/concrete.py @@ -7,12 +7,18 @@ from sqlalchemy.orm import attributes class ConcreteTest(ORMTest): def define_tables(self, metadata): - global managers_table, engineers_table, hackers_table, companies + global managers_table, engineers_table, hackers_table, companies, employees_table companies = Table('companies', metadata, Column('id', Integer, primary_key=True), Column('name', String(50))) + employees_table = Table('employees', metadata, + Column('employee_id', Integer, primary_key=True), + Column('name', String(50)), + Column('company_id', Integer, ForeignKey('companies.id')) + ) + managers_table = Table('managers', metadata, Column('employee_id', Integer, primary_key=True), Column('name', String(50)), @@ -80,7 +86,7 @@ class ConcreteTest(ORMTest): session.expire(manager, ['manager_data']) self.assertEquals(manager.manager_data, "knows how to manage things") - def test_multi_level(self): + def test_multi_level_no_base(self): class Employee(object): def __init__(self, name): self.name = name @@ -157,6 +163,92 @@ class ConcreteTest(ORMTest): assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Jerry knows how to program", "Manager Tom knows how to manage things", "Hacker Kurt 'Badass' knows how to hack"]) assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"]) assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Jerry knows how to program", "Hacker Kurt 'Badass' knows how to hack"]) + assert set([repr(x) for x in session.query(Hacker).all()]) == set(["Hacker Kurt 'Badass' knows how to hack"]) + + def test_multi_level_with_base(self): + class Employee(object): + def __init__(self, name): + self.name = name + def __repr__(self): + return self.__class__.__name__ + " " + self.name + + class Manager(Employee): + def __init__(self, name, manager_data): + self.name = name + self.manager_data = manager_data + def __repr__(self): + return self.__class__.__name__ + " " + self.name + " " + self.manager_data + + class Engineer(Employee): + def __init__(self, name, engineer_info): + self.name = name + self.engineer_info = engineer_info + def __repr__(self): + return self.__class__.__name__ + " " + self.name + " " + self.engineer_info + + class Hacker(Engineer): + def __init__(self, name, nickname, engineer_info): + self.name = name + self.nickname = nickname + self.engineer_info = engineer_info + def __repr__(self): + return self.__class__.__name__ + " " + self.name + " '" + \ + self.nickname + "' " + self.engineer_info + + pjoin = polymorphic_union({ + 'employee':employees_table, + 'manager': managers_table, + 'engineer': engineers_table, + 'hacker': hackers_table + }, 'type', 'pjoin') + + pjoin2 = polymorphic_union({ + 'engineer': engineers_table, + 'hacker': hackers_table + }, 'type', 'pjoin2') + + employee_mapper = mapper(Employee, employees_table, with_polymorphic=('*', pjoin), polymorphic_on=pjoin.c.type) + manager_mapper = mapper(Manager, managers_table, + inherits=employee_mapper, concrete=True, + polymorphic_identity='manager') + engineer_mapper = mapper(Engineer, engineers_table, + with_polymorphic=('*', pjoin2), + polymorphic_on=pjoin2.c.type, + inherits=employee_mapper, concrete=True, + polymorphic_identity='engineer') + hacker_mapper = mapper(Hacker, hackers_table, + inherits=engineer_mapper, + concrete=True, polymorphic_identity='hacker') + + session = create_session() + tom = Manager('Tom', 'knows how to manage things') + jerry = Engineer('Jerry', 'knows how to program') + hacker = Hacker('Kurt', 'Badass', 'knows how to hack') + session.add_all((tom, jerry, hacker)) + session.flush() + + # ensure "readonly" on save logic didn't pollute the expired_attributes + # collection + assert 'nickname' not in attributes.instance_state(jerry).expired_attributes + assert 'name' not in attributes.instance_state(jerry).expired_attributes + assert 'name' not in attributes.instance_state(hacker).expired_attributes + assert 'nickname' not in attributes.instance_state(hacker).expired_attributes + def go(): + self.assertEquals(jerry.name, "Jerry") + self.assertEquals(hacker.nickname, "Badass") + self.assert_sql_count(testing.db, go, 0) + + session.clear() + + # check that we aren't getting a cartesian product in the raw SQL. + # this requires that Engineer's polymorphic discriminator is not rendered + # in the statement which is only against Employee's "pjoin" + assert len(testing.db.execute(session.query(Employee).with_labels().statement).fetchall()) == 3 + + assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Jerry knows how to program", "Manager Tom knows how to manage things", "Hacker Kurt 'Badass' knows how to hack"]) + assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"]) + assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Jerry knows how to program", "Hacker Kurt 'Badass' knows how to hack"]) + assert set([repr(x) for x in session.query(Hacker).all()]) == set(["Hacker Kurt 'Badass' knows how to hack"]) def test_relation(self): class Employee(object): diff --git a/test/sql/generative.py b/test/sql/generative.py index f6b849e8a..4edf334f6 100644 --- a/test/sql/generative.py +++ b/test/sql/generative.py @@ -458,6 +458,32 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): assert str(e) == "a_1.id = a.xxx_id" + def test_recursive_equivalents(self): + m = MetaData() + a = Table('a', m, Column('x', Integer), Column('y', Integer)) + b = Table('b', m, Column('x', Integer), Column('y', Integer)) + c = Table('c', m, Column('x', Integer), Column('y', Integer)) + + # force a recursion overflow, by linking a.c.x<->c.c.x, and + # asking for a nonexistent col. corresponding_column should prevent + # endless depth. + adapt = sql_util.ClauseAdapter( b, equivalents= {a.c.x: set([ c.c.x]), c.c.x:set([a.c.x])}) + assert adapt._corresponding_column(a.c.x, False) is None + + def test_multilevel_equivalents(self): + m = MetaData() + a = Table('a', m, Column('x', Integer), Column('y', Integer)) + b = Table('b', m, Column('x', Integer), Column('y', Integer)) + c = Table('c', m, Column('x', Integer), Column('y', Integer)) + + alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias() + + # two levels of indirection from c.x->b.x->a.x, requires recursive + # corresponding_column call + adapt = sql_util.ClauseAdapter(alias, equivalents= {b.c.x: set([ a.c.x]), c.c.x:set([b.c.x])}) + assert adapt._corresponding_column(a.c.x, False) is alias.c.x + assert adapt._corresponding_column(c.c.x, False) is alias.c.x + def test_join_to_alias(self): metadata = MetaData() a = Table('a', metadata, |
