diff options
Diffstat (limited to 'test/orm')
-rw-r--r-- | test/orm/inheritance/test_assorted_poly.py | 168 | ||||
-rw-r--r-- | test/orm/test_cache_key.py | 4 |
2 files changed, 171 insertions, 1 deletions
diff --git a/test/orm/inheritance/test_assorted_poly.py b/test/orm/inheritance/test_assorted_poly.py index a40a9ae74..28480c89e 100644 --- a/test/orm/inheritance/test_assorted_poly.py +++ b/test/orm/inheritance/test_assorted_poly.py @@ -7,6 +7,7 @@ from __future__ import annotations from typing import Optional +from sqlalchemy import and_ from sqlalchemy import exists from sqlalchemy import ForeignKey from sqlalchemy import func @@ -37,6 +38,7 @@ from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import AssertsExecutionResults from sqlalchemy.testing import config from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing.fixtures import ComparableEntity from sqlalchemy.testing.fixtures import fixture_session @@ -2765,3 +2767,169 @@ class PolyIntoSelfReferentialTest( assert False self._run_load(opt) + + +class AdaptExistsSubqTest(fixtures.DeclarativeMappedTest): + """test for #9777""" + + @classmethod + def setup_classes(cls): + Base = cls.DeclarativeBasic + + class Discriminator(Base): + __tablename__ = "discriminator" + id = Column(Integer, primary_key=True, autoincrement=False) + value = Column(String(50)) + + class Entity(Base): + __tablename__ = "entity" + __mapper_args__ = {"polymorphic_on": "type"} + + id = Column(Integer, primary_key=True, autoincrement=False) + type = Column(String(50)) + + discriminator_id = Column( + ForeignKey("discriminator.id"), nullable=False + ) + discriminator = relationship( + "Discriminator", foreign_keys=discriminator_id + ) + + class Parent(Entity): + __tablename__ = "parent" + __mapper_args__ = {"polymorphic_identity": "parent"} + + id = Column(Integer, ForeignKey("entity.id"), primary_key=True) + some_data = Column(String(30)) + + class Child(Entity): + __tablename__ = "child" + __mapper_args__ = {"polymorphic_identity": "child"} + + id = Column(Integer, ForeignKey("entity.id"), primary_key=True) + + some_data = Column(String(30)) + parent_id = Column(ForeignKey("parent.id"), nullable=False) + parent = relationship( + "Parent", + foreign_keys=parent_id, + backref="children", + ) + + @classmethod + def insert_data(cls, connection): + Parent, Child, Discriminator = cls.classes( + "Parent", "Child", "Discriminator" + ) + + with Session(connection) as sess: + discriminator_zero = Discriminator(id=1, value="zero") + discriminator_one = Discriminator(id=2, value="one") + discriminator_two = Discriminator(id=3, value="two") + + parent = Parent(id=1, discriminator=discriminator_zero) + child_1 = Child( + id=2, + discriminator=discriminator_one, + parent=parent, + some_data="c1data", + ) + child_2 = Child( + id=3, + discriminator=discriminator_two, + parent=parent, + some_data="c2data", + ) + sess.add_all([parent, child_1, child_2]) + sess.commit() + + def test_explicit_aliasing(self): + Parent, Child, Discriminator = self.classes( + "Parent", "Child", "Discriminator" + ) + + parent_id = 1 + discriminator_one_id = 2 + + session = fixture_session() + c_alias = aliased(Child, flat=True) + retrieved = ( + session.query(Parent) + .filter_by(id=parent_id) + .outerjoin( + Parent.children.of_type(c_alias).and_( + c_alias.discriminator.has( + and_( + Discriminator.id == discriminator_one_id, + c_alias.some_data == "c1data", + ) + ) + ) + ) + .options(contains_eager(Parent.children.of_type(c_alias))) + .populate_existing() + .one() + ) + eq_(len(retrieved.children), 1) + + def test_implicit_aliasing(self): + Parent, Child, Discriminator = self.classes( + "Parent", "Child", "Discriminator" + ) + + parent_id = 1 + discriminator_one_id = 2 + + session = fixture_session() + q = ( + session.query(Parent) + .filter_by(id=parent_id) + .outerjoin( + Parent.children.and_( + Child.discriminator.has( + and_( + Discriminator.id == discriminator_one_id, + Child.some_data == "c1data", + ) + ) + ) + ) + .options(contains_eager(Parent.children)) + .populate_existing() + ) + + with expect_warnings("An alias is being generated automatically"): + retrieved = q.one() + + eq_(len(retrieved.children), 1) + + @testing.combinations(joinedload, selectinload, argnames="loader") + def test_eager_loaders(self, loader): + Parent, Child, Discriminator = self.classes( + "Parent", "Child", "Discriminator" + ) + + parent_id = 1 + discriminator_one_id = 2 + + session = fixture_session() + retrieved = ( + session.query(Parent) + .filter_by(id=parent_id) + .options( + loader( + Parent.children.and_( + Child.discriminator.has( + and_( + Discriminator.id == discriminator_one_id, + Child.some_data == "c1data", + ) + ) + ) + ) + ) + .populate_existing() + .one() + ) + + eq_(len(retrieved.children), 1) diff --git a/test/orm/test_cache_key.py b/test/orm/test_cache_key.py index 884baed62..5e5a85761 100644 --- a/test/orm/test_cache_key.py +++ b/test/orm/test_cache_key.py @@ -1143,7 +1143,9 @@ class EmbeddedSubqTest( Base.registry.configure() @testing.combinations( - "tuples", ("memory", testing.requires.is64bit), argnames="assert_on" + "tuples", + ("memory", testing.requires.is64bit + testing.requires.cpython), + argnames="assert_on", ) def test_cache_key_gen(self, assert_on): Employee = self.classes.Employee |