summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/unreleased_11/4185.rst16
-rw-r--r--lib/sqlalchemy/ext/associationproxy.py48
-rw-r--r--lib/sqlalchemy/orm/instrumentation.py35
-rw-r--r--test/ext/test_associationproxy.py158
4 files changed, 244 insertions, 13 deletions
diff --git a/doc/build/changelog/unreleased_11/4185.rst b/doc/build/changelog/unreleased_11/4185.rst
new file mode 100644
index 000000000..8be1f777b
--- /dev/null
+++ b/doc/build/changelog/unreleased_11/4185.rst
@@ -0,0 +1,16 @@
+.. change::
+ :tags: bug, orm
+ :tickets: 4185
+ :versions: 1.2.3
+
+ Fixed regression caused by fix for issue :ticket:`4116` affecting versions
+ 1.2.2 as well as 1.1.15, which had the effect of mis-calculation of the
+ "owning class" of an :class:`.AssociationProxy` as the ``NoneType`` class
+ in some declarative mixin/inheritance situations as well as if the
+ association proxy were accessed off of an un-mapped class. The "figure out
+ the owner" logic has been replaced by an in-depth routine that searches
+ through the complete mapper hierarchy assigned to the class or subclass to
+ determine the correct (we hope) match; will not assign the owner if no
+ match is found. An exception is now raised if the proxy is used
+ against an un-mapped instance.
+
diff --git a/lib/sqlalchemy/ext/associationproxy.py b/lib/sqlalchemy/ext/associationproxy.py
index 8b90f0925..72200c5e2 100644
--- a/lib/sqlalchemy/ext/associationproxy.py
+++ b/lib/sqlalchemy/ext/associationproxy.py
@@ -212,7 +212,12 @@ class AssociationProxy(interfaces.InspectionAttrInfo):
return (self.local_attr, self.remote_attr)
def _get_property(self):
- return (orm.class_mapper(self.owning_class).
+ owning_class = self.owning_class
+ if owning_class is None:
+ raise exc.InvalidRequestError(
+ "This association proxy has no mapped owning class; "
+ "can't locate a mapped property")
+ return (orm.class_mapper(owning_class).
get_property(self.target_collection))
@util.memoized_property
@@ -244,18 +249,33 @@ class AssociationProxy(interfaces.InspectionAttrInfo):
def _target_is_object(self):
return getattr(self.target_class, self.value_attr).impl.uses_objects
+ def _calc_owner(self, obj, class_):
+ if obj is not None and class_ is None:
+ target_cls = type(obj)
+ elif class_ is not None:
+ target_cls = class_
+ else:
+ return
+
+ # we might be getting invoked for a subclass
+ # that is not mapped yet, in some declarative situations.
+ # save until we are mapped
+ try:
+ insp = inspect(target_cls)
+ except exc.NoInspectionAvailable:
+ # can't find a mapper, don't set owner. if we are a not-yet-mapped
+ # subclass, we can also scan through __mro__ to find a mapped
+ # class, but instead just wait for us to be called again against a
+ # mapped class normally.
+ return
+
+ # note we can get our real .key here too
+ owner = insp.mapper.class_manager._locate_owning_manager(self)
+ self.owning_class = owner.class_
+
def __get__(self, obj, class_):
if self.owning_class is None:
- try:
- insp = inspect(class_)
- except exc.NoInspectionAvailable:
- pass
- else:
- if hasattr(insp, 'mapper'):
- self.owning_class = insp.mapper.class_
-
- if self.owning_class is None:
- self.owning_class = type(obj)
+ self._calc_owner(obj, class_)
if obj is None:
return self
@@ -278,7 +298,7 @@ class AssociationProxy(interfaces.InspectionAttrInfo):
def __set__(self, obj, values):
if self.owning_class is None:
- self.owning_class = type(obj)
+ self._calc_owner(obj, None)
if self.scalar:
creator = self.creator and self.creator or self.target_class
@@ -295,7 +315,8 @@ class AssociationProxy(interfaces.InspectionAttrInfo):
def __delete__(self, obj):
if self.owning_class is None:
- self.owning_class = type(obj)
+ self._calc_owner(obj, None)
+
delattr(obj, self.key)
def _initialize_scalar_accessors(self):
@@ -497,6 +518,7 @@ class AssociationProxy(interfaces.InspectionAttrInfo):
def __repr__(self):
return "AssociationProxy(%r, %r)" % (self.target_collection, self.value_attr)
+
class _lazy_collection(object):
def __init__(self, obj, target):
self.ref = weakref.ref(obj)
diff --git a/lib/sqlalchemy/orm/instrumentation.py b/lib/sqlalchemy/orm/instrumentation.py
index 5a96c57cf..d5422b0d0 100644
--- a/lib/sqlalchemy/orm/instrumentation.py
+++ b/lib/sqlalchemy/orm/instrumentation.py
@@ -115,6 +115,41 @@ class ClassManager(dict):
# raises unless self.mapper has been assigned
raise exc.UnmappedClassError(self.class_)
+ def _locate_owning_manager(self, attribute):
+ """Scan through all instrumented classes in our hierarchy
+ searching for the given object as an attribute, and return
+ the bottommost owner.
+
+ E.g.::
+
+ foo = foobar()
+
+ class Parent:
+ attr = foo
+
+ class Child(Parent):
+ pass
+
+ Child.manager._locate_owning_manager(foo) would
+ give us Parent.
+
+ Needed by association proxy to correctly figure out the
+ owning class when the attribute is accessed.
+
+ """
+
+ stack = [None]
+ for supercls in self.class_.__mro__:
+ mgr = manager_of_class(supercls)
+ if not mgr:
+ continue
+ for key in set(supercls.__dict__):
+ val = supercls.__dict__[key]
+ if val is attribute:
+ stack.append(mgr)
+ continue
+ return stack[-1]
+
def _all_sqla_attributes(self, exclude=None):
"""return an iterator of all classbound attributes that are
implement :class:`.InspectionAttr`.
diff --git a/test/ext/test_associationproxy.py b/test/ext/test_associationproxy.py
index 0052bb188..111ba91a8 100644
--- a/test/ext/test_associationproxy.py
+++ b/test/ext/test_associationproxy.py
@@ -15,6 +15,7 @@ from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.testing.mock import Mock, call
from sqlalchemy.testing.assertions import expect_warnings
from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.ext.declarative import declared_attr
class DictCollection(dict):
@@ -1891,6 +1892,9 @@ class DictOfTupleUpdateTest(fixtures.TestBase):
class AttributeAccessTest(fixtures.TestBase):
+ def teardown(self):
+ clear_mappers()
+
def test_resolve_aliased_class(self):
Base = declarative_base()
@@ -1914,6 +1918,160 @@ class AttributeAccessTest(fixtures.TestBase):
is_(spec.owning_class, B)
+ def test_resolved_w_subclass(self):
+ # test for issue #4185, as well as several below
+
+ Base = declarative_base()
+
+ class Mixin(object):
+ @declared_attr
+ def children(cls):
+ return association_proxy('_children', 'value')
+
+ # 1. build parent, Mixin.children gets invoked, we add
+ # Parent.children
+ class Parent(Mixin, Base):
+ __tablename__ = 'parent'
+ id = Column(Integer, primary_key=True)
+
+ _children = relationship("Child")
+
+ class Child(Base):
+ __tablename__ = 'child'
+ parent_id = Column(
+ Integer, ForeignKey(Parent.id), primary_key=True)
+
+ # 2. declarative builds up SubParent, scans through all attributes
+ # over all classes. Hits Mixin, hits "children", accesses "children"
+ # in terms of the class, e.g. SubParent.children. SubParent isn't
+ # mapped yet. association proxy then sets up "owning_class"
+ # as NoneType.
+ class SubParent(Parent):
+ __tablename__ = 'subparent'
+ id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
+
+ configure_mappers()
+
+ # 3. which would break here.
+ p1 = Parent()
+ eq_(p1.children, [])
+
+ def test_resolved_to_correct_class_one(self):
+ Base = declarative_base()
+
+ class Parent(Base):
+ __tablename__ = 'parent'
+ id = Column(Integer, primary_key=True)
+ _children = relationship("Child")
+ children = association_proxy('_children', 'value')
+
+ class Child(Base):
+ __tablename__ = 'child'
+ parent_id = Column(
+ Integer, ForeignKey(Parent.id), primary_key=True)
+
+ class SubParent(Parent):
+ __tablename__ = 'subparent'
+ id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
+
+ is_(SubParent.children.owning_class, Parent)
+
+ def test_resolved_to_correct_class_two(self):
+ Base = declarative_base()
+
+ class Parent(Base):
+ __tablename__ = 'parent'
+ id = Column(Integer, primary_key=True)
+ _children = relationship("Child")
+
+ class Child(Base):
+ __tablename__ = 'child'
+ parent_id = Column(
+ Integer, ForeignKey(Parent.id), primary_key=True)
+
+ class SubParent(Parent):
+ __tablename__ = 'subparent'
+ id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
+ children = association_proxy('_children', 'value')
+
+ is_(SubParent.children.owning_class, SubParent)
+
+ def test_resolved_to_correct_class_three(self):
+ Base = declarative_base()
+
+ class Parent(Base):
+ __tablename__ = 'parent'
+ id = Column(Integer, primary_key=True)
+ _children = relationship("Child")
+
+ class Child(Base):
+ __tablename__ = 'child'
+ parent_id = Column(
+ Integer, ForeignKey(Parent.id), primary_key=True)
+
+ class SubParent(Parent):
+ __tablename__ = 'subparent'
+ id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
+ children = association_proxy('_children', 'value')
+
+ class SubSubParent(SubParent):
+ __tablename__ = 'subsubparent'
+ id = Column(Integer, ForeignKey(SubParent.id), primary_key=True)
+
+ is_(SubSubParent.children.owning_class, SubParent)
+
+ def test_resolved_to_correct_class_four(self):
+ Base = declarative_base()
+
+ class Parent(Base):
+ __tablename__ = 'parent'
+ id = Column(Integer, primary_key=True)
+ _children = relationship("Child")
+ children = association_proxy(
+ '_children', 'value', creator=lambda value: Child(value=value))
+
+ class Child(Base):
+ __tablename__ = 'child'
+ parent_id = Column(
+ Integer, ForeignKey(Parent.id), primary_key=True)
+ value = Column(String)
+
+ class SubParent(Parent):
+ __tablename__ = 'subparent'
+ id = Column(Integer, ForeignKey(Parent.id), primary_key=True)
+
+ sp = SubParent()
+ sp.children = 'c'
+ is_(SubParent.children.owning_class, Parent)
+
+ def test_never_assign_nonetype(self):
+ foo = association_proxy('x', 'y')
+ foo._calc_owner(None, None)
+ is_(foo.owning_class, None)
+
+ class Bat(object):
+ foo = association_proxy('x', 'y')
+
+ Bat.foo
+ is_(Bat.foo.owning_class, None)
+
+ b1 = Bat()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This association proxy has no mapped owning class; "
+ "can't locate a mapped property",
+ getattr, b1, "foo"
+ )
+ is_(Bat.foo.owning_class, None)
+
+ # after all that, we can map it
+ mapper(
+ Bat,
+ Table('bat', MetaData(), Column('x', Integer, primary_key=True)))
+
+ # answer is correct
+ is_(Bat.foo.owning_class, Bat)
+
class InfoTest(fixtures.TestBase):
def test_constructor(self):