summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2008-12-03 17:28:36 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2008-12-03 17:28:36 +0000
commit0410eae36b36dc8ea7e747c4b81c7ec9de5f2da4 (patch)
tree86a53b2b2c33a3348d6aca9c7bc825315816748f /test
parent851a14aa1a0e7a734a6f810f0e6e5c39d8e63b1b (diff)
downloadsqlalchemy-0410eae36b36dc8ea7e747c4b81c7ec9de5f2da4.tar.gz
- Two fixes to help prevent out-of-band columns from
being rendered in polymorphic_union inheritance scenarios (which then causes extra tables to be rendered in the FROM clause causing cartesian products): - improvements to "column adaption" for a->b->c inheritance situations to better locate columns that are related to one another via multiple levels of indirection, rather than rendering the non-adapted column. - the "polymorphic discriminator" column is only rendered for the actual mapper being queried against. The column won't be "pulled in" from a subclass or superclass mapper since it's not needed.
Diffstat (limited to 'test')
-rw-r--r--test/orm/inheritance/concrete.py96
-rw-r--r--test/sql/generative.py26
2 files changed, 120 insertions, 2 deletions
diff --git a/test/orm/inheritance/concrete.py b/test/orm/inheritance/concrete.py
index b8d799b60..e6277f3e9 100644
--- a/test/orm/inheritance/concrete.py
+++ b/test/orm/inheritance/concrete.py
@@ -7,12 +7,18 @@ from sqlalchemy.orm import attributes
class ConcreteTest(ORMTest):
def define_tables(self, metadata):
- global managers_table, engineers_table, hackers_table, companies
+ global managers_table, engineers_table, hackers_table, companies, employees_table
companies = Table('companies', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)))
+ employees_table = Table('employees', metadata,
+ Column('employee_id', Integer, primary_key=True),
+ Column('name', String(50)),
+ Column('company_id', Integer, ForeignKey('companies.id'))
+ )
+
managers_table = Table('managers', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
@@ -80,7 +86,7 @@ class ConcreteTest(ORMTest):
session.expire(manager, ['manager_data'])
self.assertEquals(manager.manager_data, "knows how to manage things")
- def test_multi_level(self):
+ def test_multi_level_no_base(self):
class Employee(object):
def __init__(self, name):
self.name = name
@@ -157,6 +163,92 @@ class ConcreteTest(ORMTest):
assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Jerry knows how to program", "Manager Tom knows how to manage things", "Hacker Kurt 'Badass' knows how to hack"])
assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"])
assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Jerry knows how to program", "Hacker Kurt 'Badass' knows how to hack"])
+ assert set([repr(x) for x in session.query(Hacker).all()]) == set(["Hacker Kurt 'Badass' knows how to hack"])
+
+ def test_multi_level_with_base(self):
+ class Employee(object):
+ def __init__(self, name):
+ self.name = name
+ def __repr__(self):
+ return self.__class__.__name__ + " " + self.name
+
+ class Manager(Employee):
+ def __init__(self, name, manager_data):
+ self.name = name
+ self.manager_data = manager_data
+ def __repr__(self):
+ return self.__class__.__name__ + " " + self.name + " " + self.manager_data
+
+ class Engineer(Employee):
+ def __init__(self, name, engineer_info):
+ self.name = name
+ self.engineer_info = engineer_info
+ def __repr__(self):
+ return self.__class__.__name__ + " " + self.name + " " + self.engineer_info
+
+ class Hacker(Engineer):
+ def __init__(self, name, nickname, engineer_info):
+ self.name = name
+ self.nickname = nickname
+ self.engineer_info = engineer_info
+ def __repr__(self):
+ return self.__class__.__name__ + " " + self.name + " '" + \
+ self.nickname + "' " + self.engineer_info
+
+ pjoin = polymorphic_union({
+ 'employee':employees_table,
+ 'manager': managers_table,
+ 'engineer': engineers_table,
+ 'hacker': hackers_table
+ }, 'type', 'pjoin')
+
+ pjoin2 = polymorphic_union({
+ 'engineer': engineers_table,
+ 'hacker': hackers_table
+ }, 'type', 'pjoin2')
+
+ employee_mapper = mapper(Employee, employees_table, with_polymorphic=('*', pjoin), polymorphic_on=pjoin.c.type)
+ manager_mapper = mapper(Manager, managers_table,
+ inherits=employee_mapper, concrete=True,
+ polymorphic_identity='manager')
+ engineer_mapper = mapper(Engineer, engineers_table,
+ with_polymorphic=('*', pjoin2),
+ polymorphic_on=pjoin2.c.type,
+ inherits=employee_mapper, concrete=True,
+ polymorphic_identity='engineer')
+ hacker_mapper = mapper(Hacker, hackers_table,
+ inherits=engineer_mapper,
+ concrete=True, polymorphic_identity='hacker')
+
+ session = create_session()
+ tom = Manager('Tom', 'knows how to manage things')
+ jerry = Engineer('Jerry', 'knows how to program')
+ hacker = Hacker('Kurt', 'Badass', 'knows how to hack')
+ session.add_all((tom, jerry, hacker))
+ session.flush()
+
+ # ensure "readonly" on save logic didn't pollute the expired_attributes
+ # collection
+ assert 'nickname' not in attributes.instance_state(jerry).expired_attributes
+ assert 'name' not in attributes.instance_state(jerry).expired_attributes
+ assert 'name' not in attributes.instance_state(hacker).expired_attributes
+ assert 'nickname' not in attributes.instance_state(hacker).expired_attributes
+ def go():
+ self.assertEquals(jerry.name, "Jerry")
+ self.assertEquals(hacker.nickname, "Badass")
+ self.assert_sql_count(testing.db, go, 0)
+
+ session.clear()
+
+ # check that we aren't getting a cartesian product in the raw SQL.
+ # this requires that Engineer's polymorphic discriminator is not rendered
+ # in the statement which is only against Employee's "pjoin"
+ assert len(testing.db.execute(session.query(Employee).with_labels().statement).fetchall()) == 3
+
+ assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Jerry knows how to program", "Manager Tom knows how to manage things", "Hacker Kurt 'Badass' knows how to hack"])
+ assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"])
+ assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Jerry knows how to program", "Hacker Kurt 'Badass' knows how to hack"])
+ assert set([repr(x) for x in session.query(Hacker).all()]) == set(["Hacker Kurt 'Badass' knows how to hack"])
def test_relation(self):
class Employee(object):
diff --git a/test/sql/generative.py b/test/sql/generative.py
index f6b849e8a..4edf334f6 100644
--- a/test/sql/generative.py
+++ b/test/sql/generative.py
@@ -458,6 +458,32 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL):
assert str(e) == "a_1.id = a.xxx_id"
+ def test_recursive_equivalents(self):
+ m = MetaData()
+ a = Table('a', m, Column('x', Integer), Column('y', Integer))
+ b = Table('b', m, Column('x', Integer), Column('y', Integer))
+ c = Table('c', m, Column('x', Integer), Column('y', Integer))
+
+ # force a recursion overflow, by linking a.c.x<->c.c.x, and
+ # asking for a nonexistent col. corresponding_column should prevent
+ # endless depth.
+ adapt = sql_util.ClauseAdapter( b, equivalents= {a.c.x: set([ c.c.x]), c.c.x:set([a.c.x])})
+ assert adapt._corresponding_column(a.c.x, False) is None
+
+ def test_multilevel_equivalents(self):
+ m = MetaData()
+ a = Table('a', m, Column('x', Integer), Column('y', Integer))
+ b = Table('b', m, Column('x', Integer), Column('y', Integer))
+ c = Table('c', m, Column('x', Integer), Column('y', Integer))
+
+ alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias()
+
+ # two levels of indirection from c.x->b.x->a.x, requires recursive
+ # corresponding_column call
+ adapt = sql_util.ClauseAdapter(alias, equivalents= {b.c.x: set([ a.c.x]), c.c.x:set([b.c.x])})
+ assert adapt._corresponding_column(a.c.x, False) is alias.c.x
+ assert adapt._corresponding_column(c.c.x, False) is alias.c.x
+
def test_join_to_alias(self):
metadata = MetaData()
a = Table('a', metadata,