import datetime import sqlalchemy as sa from sqlalchemy import and_ from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import ForeignKeyConstraint from sqlalchemy import func from sqlalchemy import inspect from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import attributes from sqlalchemy.orm import backref from sqlalchemy.orm import clear_mappers from sqlalchemy.orm import column_property from sqlalchemy.orm import composite from sqlalchemy.orm import configure_mappers from sqlalchemy.orm import create_session from sqlalchemy.orm import foreign from sqlalchemy.orm import joinedload from sqlalchemy.orm import mapper from sqlalchemy.orm import relation from sqlalchemy.orm import relationship from sqlalchemy.orm import remote from sqlalchemy.orm import Session from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import subqueryload from sqlalchemy.orm import synonym from sqlalchemy.orm.interfaces import MANYTOONE from sqlalchemy.orm.interfaces import ONETOMANY from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ from sqlalchemy.testing import is_ from sqlalchemy.testing import startswith_ from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table from test.orm import _fixtures class _RelationshipErrors(object): def _assert_raises_no_relevant_fks( self, fn, expr, relname, primary, *arg, **kw ): assert_raises_message( sa.exc.ArgumentError, "Could not locate any relevant foreign key columns " "for %s join condition '%s' on relationship %s. " "Ensure that referencing columns are associated with " "a ForeignKey or ForeignKeyConstraint, or are annotated " r"in the join condition with the foreign\(\) annotation." % (primary, expr, relname), fn, *arg, **kw ) def _assert_raises_no_equality( self, fn, expr, relname, primary, *arg, **kw ): assert_raises_message( sa.exc.ArgumentError, "Could not locate any simple equality expressions " "involving locally mapped foreign key columns for %s join " "condition '%s' on relationship %s. " "Ensure that referencing columns are associated with a " "ForeignKey or ForeignKeyConstraint, or are annotated in " r"the join condition with the foreign\(\) annotation. " "To allow comparison operators other than '==', " "the relationship can be marked as viewonly=True." % (primary, expr, relname), fn, *arg, **kw ) def _assert_raises_ambig_join( self, fn, relname, secondary_arg, *arg, **kw ): if secondary_arg is not None: assert_raises_message( exc.ArgumentError, "Could not determine join condition between " "parent/child tables on relationship %s - " "there are multiple foreign key paths linking the " "tables via secondary table '%s'. " "Specify the 'foreign_keys' argument, providing a list " "of those columns which should be counted as " "containing a foreign key reference from the " "secondary table to each of the parent and child tables." % (relname, secondary_arg), fn, *arg, **kw ) else: assert_raises_message( exc.ArgumentError, "Could not determine join " "condition between parent/child tables on " "relationship %s - there are multiple foreign key " "paths linking the tables. Specify the " "'foreign_keys' argument, providing a list of those " "columns which should be counted as containing a " "foreign key reference to the parent table." % (relname,), fn, *arg, **kw ) def _assert_raises_no_join(self, fn, relname, secondary_arg, *arg, **kw): if secondary_arg is not None: assert_raises_message( exc.NoForeignKeysError, "Could not determine join condition between " "parent/child tables on relationship %s - " "there are no foreign keys linking these tables " "via secondary table '%s'. " "Ensure that referencing columns are associated with a " "ForeignKey " "or ForeignKeyConstraint, or specify 'primaryjoin' and " "'secondaryjoin' expressions" % (relname, secondary_arg), fn, *arg, **kw ) else: assert_raises_message( exc.NoForeignKeysError, "Could not determine join condition between " "parent/child tables on relationship %s - " "there are no foreign keys linking these tables. " "Ensure that referencing columns are associated with a " "ForeignKey " "or ForeignKeyConstraint, or specify a 'primaryjoin' " "expression." % (relname,), fn, *arg, **kw ) def _assert_raises_ambiguous_direction(self, fn, relname, *arg, **kw): assert_raises_message( sa.exc.ArgumentError, "Can't determine relationship" " direction for relationship '%s' - foreign " "key columns within the join condition are present " "in both the parent and the child's mapped tables. " "Ensure that only those columns referring to a parent column " r"are marked as foreign, either via the foreign\(\) annotation or " "via the foreign_keys argument." % relname, fn, *arg, **kw ) def _assert_raises_no_local_remote(self, fn, relname, *arg, **kw): assert_raises_message( sa.exc.ArgumentError, "Relationship %s could not determine " "any unambiguous local/remote column " "pairs based on join condition and remote_side arguments. " r"Consider using the remote\(\) annotation to " "accurately mark those elements of the join " "condition that are on the remote side of the relationship." % (relname), fn, *arg, **kw ) class DependencyTwoParentTest(fixtures.MappedTest): """Test flush() when a mapper is dependent on multiple relationships""" run_setup_mappers = "once" run_inserts = "once" run_deletes = None @classmethod def define_tables(cls, metadata): Table( "tbl_a", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(128)), ) Table( "tbl_b", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(128)), ) Table( "tbl_c", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column( "tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False ), Column("name", String(128)), ) Table( "tbl_d", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column( "tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False ), Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")), Column("name", String(128)), ) @classmethod def setup_classes(cls): class A(cls.Basic): pass class B(cls.Basic): pass class C(cls.Basic): pass class D(cls.Basic): pass @classmethod def setup_mappers(cls): A, C, B, D, tbl_b, tbl_c, tbl_a, tbl_d = ( cls.classes.A, cls.classes.C, cls.classes.B, cls.classes.D, cls.tables.tbl_b, cls.tables.tbl_c, cls.tables.tbl_a, cls.tables.tbl_d, ) mapper( A, tbl_a, properties=dict( c_rows=relationship( C, cascade="all, delete-orphan", backref="a_row" ) ), ) mapper(B, tbl_b) mapper( C, tbl_c, properties=dict( d_rows=relationship( D, cascade="all, delete-orphan", backref="c_row" ) ), ) mapper(D, tbl_d, properties=dict(b_row=relationship(B))) @classmethod def insert_data(cls): A, C, B, D = ( cls.classes.A, cls.classes.C, cls.classes.B, cls.classes.D, ) session = create_session() a = A(name="a1") b = B(name="b1") c = C(name="c1", a_row=a) d1 = D(name="d1", b_row=b, c_row=c) # noqa d2 = D(name="d2", b_row=b, c_row=c) # noqa d3 = D(name="d3", b_row=b, c_row=c) # noqa session.add(a) session.add(b) session.flush() def test_DeleteRootTable(self): A = self.classes.A session = create_session() a = session.query(A).filter_by(name="a1").one() session.delete(a) session.flush() def test_DeleteMiddleTable(self): C = self.classes.C session = create_session() c = session.query(C).filter_by(name="c1").one() session.delete(c) session.flush() class M2ODontOverwriteFKTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "a", metadata, Column("id", Integer, primary_key=True), Column("bid", ForeignKey("b.id")), ) Table("b", metadata, Column("id", Integer, primary_key=True)) def _fixture(self, uselist=False): a, b = self.tables.a, self.tables.b class A(fixtures.BasicEntity): pass class B(fixtures.BasicEntity): pass mapper(A, a, properties={"b": relationship(B, uselist=uselist)}) mapper(B, b) return A, B def test_joinedload_doesnt_produce_bogus_event(self): A, B = self._fixture() sess = Session() b1 = B() sess.add(b1) sess.flush() a1 = A() sess.add(a1) sess.commit() # test that was broken by #3060 a1 = sess.query(A).options(joinedload("b")).first() a1.bid = b1.id sess.flush() eq_(a1.bid, b1.id) def test_init_doesnt_produce_scalar_event(self): A, B = self._fixture() sess = Session() b1 = B() sess.add(b1) sess.flush() a1 = A() assert a1.b is None a1.bid = b1.id sess.add(a1) sess.flush() assert a1.bid is not None def test_init_doesnt_produce_collection_event(self): A, B = self._fixture(uselist=True) sess = Session() b1 = B() sess.add(b1) sess.flush() a1 = A() assert a1.b == [] a1.bid = b1.id sess.add(a1) sess.flush() assert a1.bid is not None def test_scalar_relationship_overrides_fk(self): A, B = self._fixture() sess = Session() b1 = B() sess.add(b1) sess.flush() a1 = A() a1.bid = b1.id a1.b = None sess.add(a1) sess.flush() assert a1.bid is None def test_collection_relationship_overrides_fk(self): A, B = self._fixture(uselist=True) sess = Session() b1 = B() sess.add(b1) sess.flush() a1 = A() a1.bid = b1.id a1.b = [] sess.add(a1) sess.flush() # this is weird assert a1.bid is not None class DirectSelfRefFKTest(fixtures.MappedTest, AssertsCompiledSQL): """Tests the ultimate join condition, a single column that points to itself, e.g. within a SQL function or similar. The test is against a materialized path setup. this is an **extremely** unusual case:: Entity ------ path -------+ ^ | +---------+ In this case, one-to-many and many-to-one are no longer accurate. Both relationships return collections. I'm not sure if this is a good idea. """ __dialect__ = "default" @classmethod def define_tables(cls, metadata): Table( "entity", metadata, Column("path", String(100), primary_key=True) ) @classmethod def setup_classes(cls): class Entity(cls.Basic): def __init__(self, path): self.path = path def _descendants_fixture(self, data=True): Entity = self.classes.Entity entity = self.tables.entity m = mapper( Entity, entity, properties={ "descendants": relationship( Entity, primaryjoin=remote(foreign(entity.c.path)).like( entity.c.path.concat("/%") ), viewonly=True, order_by=entity.c.path, ) }, ) configure_mappers() assert m.get_property("descendants").direction is ONETOMANY if data: return self._fixture() def _anscestors_fixture(self, data=True): Entity = self.classes.Entity entity = self.tables.entity m = mapper( Entity, entity, properties={ "anscestors": relationship( Entity, primaryjoin=entity.c.path.like( remote(foreign(entity.c.path)).concat("/%") ), viewonly=True, order_by=entity.c.path, ) }, ) configure_mappers() assert m.get_property("anscestors").direction is ONETOMANY if data: return self._fixture() def _fixture(self): Entity = self.classes.Entity sess = Session() sess.add_all( [ Entity("/foo"), Entity("/foo/bar1"), Entity("/foo/bar2"), Entity("/foo/bar2/bat1"), Entity("/foo/bar2/bat2"), Entity("/foo/bar3"), Entity("/bar"), Entity("/bar/bat1"), ] ) return sess def test_descendants_lazyload_clause(self): self._descendants_fixture(data=False) Entity = self.classes.Entity self.assert_compile( Entity.descendants.property.strategy._lazywhere, "entity.path LIKE :param_1 || :path_1", ) self.assert_compile( Entity.descendants.property.strategy._rev_lazywhere, ":param_1 LIKE entity.path || :path_1", ) def test_ancestors_lazyload_clause(self): self._anscestors_fixture(data=False) Entity = self.classes.Entity # :param_1 LIKE (:param_1 || :path_1) self.assert_compile( Entity.anscestors.property.strategy._lazywhere, ":param_1 LIKE entity.path || :path_1", ) self.assert_compile( Entity.anscestors.property.strategy._rev_lazywhere, "entity.path LIKE :param_1 || :path_1", ) def test_descendants_lazyload(self): sess = self._descendants_fixture() Entity = self.classes.Entity e1 = sess.query(Entity).filter_by(path="/foo").first() eq_( [e.path for e in e1.descendants], [ "/foo/bar1", "/foo/bar2", "/foo/bar2/bat1", "/foo/bar2/bat2", "/foo/bar3", ], ) def test_anscestors_lazyload(self): sess = self._anscestors_fixture() Entity = self.classes.Entity e1 = sess.query(Entity).filter_by(path="/foo/bar2/bat1").first() eq_([e.path for e in e1.anscestors], ["/foo", "/foo/bar2"]) def test_descendants_joinedload(self): sess = self._descendants_fixture() Entity = self.classes.Entity e1 = ( sess.query(Entity) .filter_by(path="/foo") .options(joinedload(Entity.descendants)) .first() ) eq_( [e.path for e in e1.descendants], [ "/foo/bar1", "/foo/bar2", "/foo/bar2/bat1", "/foo/bar2/bat2", "/foo/bar3", ], ) def test_descendants_subqueryload(self): sess = self._descendants_fixture() Entity = self.classes.Entity e1 = ( sess.query(Entity) .filter_by(path="/foo") .options(subqueryload(Entity.descendants)) .first() ) eq_( [e.path for e in e1.descendants], [ "/foo/bar1", "/foo/bar2", "/foo/bar2/bat1", "/foo/bar2/bat2", "/foo/bar3", ], ) def test_anscestors_joinedload(self): sess = self._anscestors_fixture() Entity = self.classes.Entity e1 = ( sess.query(Entity) .filter_by(path="/foo/bar2/bat1") .options(joinedload(Entity.anscestors)) .first() ) eq_([e.path for e in e1.anscestors], ["/foo", "/foo/bar2"]) def test_plain_join_descendants(self): self._descendants_fixture(data=False) Entity = self.classes.Entity sess = Session() self.assert_compile( sess.query(Entity).join(Entity.descendants, aliased=True), "SELECT entity.path AS entity_path FROM entity JOIN entity AS " "entity_1 ON entity_1.path LIKE entity.path || :path_1", ) class OverlappingFksSiblingTest(fixtures.TestBase): """Test multiple relationships that use sections of the same composite foreign key. """ def teardown(self): clear_mappers() def _fixture_one( self, add_b_a=False, add_b_a_viewonly=False, add_b_amember=False, add_bsub1_a=False, add_bsub2_a_viewonly=False, ): Base = declarative_base(metadata=self.metadata) class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) a_members = relationship("AMember", backref="a") class AMember(Base): __tablename__ = "a_member" a_id = Column(Integer, ForeignKey("a.id"), primary_key=True) a_member_id = Column(Integer, primary_key=True) class B(Base): __tablename__ = "b" __mapper_args__ = {"polymorphic_on": "type"} id = Column(Integer, primary_key=True) type = Column(String(20)) a_id = Column(Integer, ForeignKey("a.id"), nullable=False) a_member_id = Column(Integer) __table_args__ = ( ForeignKeyConstraint( ("a_id", "a_member_id"), ("a_member.a_id", "a_member.a_member_id"), ), ) # if added and viewonly is not true, this relationship # writes to B.a_id, which conflicts with BSub2.a_member, # so should warn if add_b_a: a = relationship("A", viewonly=add_b_a_viewonly) # if added, this relationship writes to B.a_id, which conflicts # with BSub1.a if add_b_amember: a_member = relationship("AMember") # however, *no* warning should be emitted otherwise. class BSub1(B): if add_bsub1_a: a = relationship("A") __mapper_args__ = {"polymorphic_identity": "bsub1"} class BSub2(B): if add_bsub2_a_viewonly: a = relationship("A", viewonly=True) a_member = relationship("AMember") __mapper_args__ = {"polymorphic_identity": "bsub2"} configure_mappers() self.metadata.create_all() return A, AMember, B, BSub1, BSub2 @testing.provide_metadata def _test_fixture_one_run(self, **kw): A, AMember, B, BSub1, BSub2 = self._fixture_one(**kw) bsub2 = BSub2() am1 = AMember(a_member_id=1) a1 = A(a_members=[am1]) bsub2.a_member = am1 bsub1 = BSub1() a2 = A() bsub1.a = a2 session = Session(testing.db) session.add_all([bsub1, bsub2, am1, a1, a2]) session.commit() assert bsub1.a is a2 assert bsub2.a is a1 # meaningless, because BSub1 doesn't have a_member bsub1.a_member = am1 # meaningless, because BSub2's ".a" is viewonly=True bsub2.a = a2 session.commit() assert bsub1.a is a2 # beacuse bsub1.a_member is not a relationship assert bsub2.a is a1 # because bsub2.a is viewonly=True # everyone has a B.a relationship eq_( session.query(B, A).outerjoin(B.a).order_by(B.id).all(), [(bsub1, a2), (bsub2, a1)], ) @testing.provide_metadata def test_warn_one(self): assert_raises_message( exc.SAWarning, r"relationship '(?:BSub1.a|BSub2.a_member|B.a)' will copy column " r"(?:a.id|a_member.a_id) to column b.a_id", self._fixture_one, add_b_a=True, add_bsub1_a=True, ) @testing.provide_metadata def test_warn_two(self): assert_raises_message( exc.SAWarning, r"relationship '(?:BSub1.a|B.a_member)' will copy column " r"(?:a.id|a_member.a_id) to column b.a_id", self._fixture_one, add_b_amember=True, add_bsub1_a=True, ) @testing.provide_metadata def test_warn_three(self): assert_raises_message( exc.SAWarning, r"relationship '(?:BSub1.a|B.a_member|B.a)' will copy column " r"(?:a.id|a_member.a_id) to column b.a_id", self._fixture_one, add_b_amember=True, add_bsub1_a=True, add_b_a=True, ) @testing.provide_metadata def test_works_one(self): self._test_fixture_one_run( add_b_a=True, add_b_a_viewonly=True, add_bsub1_a=True ) @testing.provide_metadata def test_works_two(self): self._test_fixture_one_run(add_b_a=True, add_bsub2_a_viewonly=True) class CompositeSelfRefFKTest(fixtures.MappedTest, AssertsCompiledSQL): """Tests a composite FK where, in the relationship(), one col points to itself in the same table. this is a very unusual case:: company employee ---------- ---------- company_id <--- company_id ------+ name ^ | +------------+ emp_id <---------+ name | reports_to_id ---+ employee joins to its sub-employees both on reports_to_id, *and on company_id to itself*. """ __dialect__ = "default" @classmethod def define_tables(cls, metadata): Table( "company_t", metadata, Column( "company_id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("name", String(30)), ) Table( "employee_t", metadata, Column("company_id", Integer, primary_key=True), Column("emp_id", Integer, primary_key=True), Column("name", String(30)), Column("reports_to_id", Integer), sa.ForeignKeyConstraint(["company_id"], ["company_t.company_id"]), sa.ForeignKeyConstraint( ["company_id", "reports_to_id"], ["employee_t.company_id", "employee_t.emp_id"], ), ) @classmethod def setup_classes(cls): class Company(cls.Basic): def __init__(self, name): self.name = name class Employee(cls.Basic): def __init__(self, name, company, emp_id, reports_to=None): self.name = name self.company = company self.emp_id = emp_id self.reports_to = reports_to def test_explicit(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship( Company, primaryjoin=employee_t.c.company_id == company_t.c.company_id, backref="employees", ), "reports_to": relationship( Employee, primaryjoin=sa.and_( employee_t.c.emp_id == employee_t.c.reports_to_id, employee_t.c.company_id == employee_t.c.company_id, ), remote_side=[employee_t.c.emp_id, employee_t.c.company_id], foreign_keys=[ employee_t.c.reports_to_id, employee_t.c.company_id, ], backref=backref( "employees", foreign_keys=[ employee_t.c.reports_to_id, employee_t.c.company_id, ], ), ), }, ) self._test() def test_implicit(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship(Company, backref="employees"), "reports_to": relationship( Employee, remote_side=[employee_t.c.emp_id, employee_t.c.company_id], foreign_keys=[ employee_t.c.reports_to_id, employee_t.c.company_id, ], backref=backref( "employees", foreign_keys=[ employee_t.c.reports_to_id, employee_t.c.company_id, ], ), ), }, ) self._test() def test_very_implicit(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship(Company, backref="employees"), "reports_to": relationship( Employee, remote_side=[employee_t.c.emp_id, employee_t.c.company_id], backref="employees", ), }, ) self._test() def test_very_explicit(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship(Company, backref="employees"), "reports_to": relationship( Employee, _local_remote_pairs=[ (employee_t.c.reports_to_id, employee_t.c.emp_id), (employee_t.c.company_id, employee_t.c.company_id), ], foreign_keys=[ employee_t.c.reports_to_id, employee_t.c.company_id, ], backref=backref( "employees", foreign_keys=[ employee_t.c.reports_to_id, employee_t.c.company_id, ], ), ), }, ) self._test() def test_annotated(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship(Company, backref="employees"), "reports_to": relationship( Employee, primaryjoin=sa.and_( remote(employee_t.c.emp_id) == employee_t.c.reports_to_id, remote(employee_t.c.company_id) == employee_t.c.company_id, ), backref=backref("employees"), ), }, ) self._assert_lazy_clauses() self._test() def test_overlapping_warning(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship(Company, backref="employees"), "reports_to": relationship( Employee, primaryjoin=sa.and_( remote(employee_t.c.emp_id) == employee_t.c.reports_to_id, remote(employee_t.c.company_id) == employee_t.c.company_id, ), backref=backref("employees"), ), }, ) assert_raises_message( exc.SAWarning, r"relationship .* will copy column .* to column " r"employee_t.company_id, which conflicts with relationship\(s\)", configure_mappers, ) def test_annotated_no_overwriting(self): Employee, Company, employee_t, company_t = ( self.classes.Employee, self.classes.Company, self.tables.employee_t, self.tables.company_t, ) mapper(Company, company_t) mapper( Employee, employee_t, properties={ "company": relationship(Company, backref="employees"), "reports_to": relationship( Employee, primaryjoin=sa.and_( remote(employee_t.c.emp_id) == foreign(employee_t.c.reports_to_id), remote(employee_t.c.company_id) == employee_t.c.company_id, ), backref=backref("employees"), ), }, ) self._assert_lazy_clauses() self._test_no_warning() def _test_no_overwrite(self, sess, expect_failure): # test [ticket:3230] Employee, Company = self.classes.Employee, self.classes.Company c1 = sess.query(Company).filter_by(name="c1").one() e3 = sess.query(Employee).filter_by(name="emp3").one() e3.reports_to = None if expect_failure: # if foreign() isn't applied specifically to # employee_t.c.reports_to_id only, then # employee_t.c.company_id goes foreign as well and then # this happens assert_raises_message( AssertionError, "Dependency rule tried to blank-out primary key column " "'employee_t.company_id'", sess.flush, ) else: sess.flush() eq_(e3.company, c1) @testing.emits_warning("relationship .* will copy column ") def _test(self): self._test_no_warning(overwrites=True) def _test_no_warning(self, overwrites=False): configure_mappers() self._test_relationships() sess = Session() self._setup_data(sess) self._test_lazy_relations(sess) self._test_join_aliasing(sess) self._test_no_overwrite(sess, expect_failure=overwrites) @testing.emits_warning("relationship .* will copy column ") def _assert_lazy_clauses(self): configure_mappers() Employee = self.classes.Employee self.assert_compile( Employee.employees.property.strategy._lazywhere, ":param_1 = employee_t.reports_to_id AND " ":param_2 = employee_t.company_id", ) self.assert_compile( Employee.employees.property.strategy._rev_lazywhere, "employee_t.emp_id = :param_1 AND " "employee_t.company_id = :param_2", ) def _test_relationships(self): Employee = self.classes.Employee employee_t = self.tables.employee_t eq_( set(Employee.employees.property.local_remote_pairs), set( [ (employee_t.c.company_id, employee_t.c.company_id), (employee_t.c.emp_id, employee_t.c.reports_to_id), ] ), ) eq_( Employee.employees.property.remote_side, set([employee_t.c.company_id, employee_t.c.reports_to_id]), ) eq_( set(Employee.reports_to.property.local_remote_pairs), set( [ (employee_t.c.company_id, employee_t.c.company_id), (employee_t.c.reports_to_id, employee_t.c.emp_id), ] ), ) def _setup_data(self, sess): Employee, Company = self.classes.Employee, self.classes.Company c1 = Company("c1") c2 = Company("c2") e1 = Employee("emp1", c1, 1) e2 = Employee("emp2", c1, 2, e1) # noqa e3 = Employee("emp3", c1, 3, e1) e4 = Employee("emp4", c1, 4, e3) # noqa e5 = Employee("emp5", c2, 1) e6 = Employee("emp6", c2, 2, e5) # noqa e7 = Employee("emp7", c2, 3, e5) # noqa sess.add_all((c1, c2)) sess.commit() sess.close() def _test_lazy_relations(self, sess): Employee, Company = self.classes.Employee, self.classes.Company c1 = sess.query(Company).filter_by(name="c1").one() c2 = sess.query(Company).filter_by(name="c2").one() e1 = sess.query(Employee).filter_by(name="emp1").one() e5 = sess.query(Employee).filter_by(name="emp5").one() test_e1 = sess.query(Employee).get([c1.company_id, e1.emp_id]) assert test_e1.name == "emp1", test_e1.name test_e5 = sess.query(Employee).get([c2.company_id, e5.emp_id]) assert test_e5.name == "emp5", test_e5.name assert [x.name for x in test_e1.employees] == ["emp2", "emp3"] assert ( sess.query(Employee).get([c1.company_id, 3]).reports_to.name == "emp1" ) assert ( sess.query(Employee).get([c2.company_id, 3]).reports_to.name == "emp5" ) def _test_join_aliasing(self, sess): Employee, Company = self.classes.Employee, self.classes.Company eq_( [ n for n, in sess.query(Employee.name) .join(Employee.reports_to, aliased=True) .filter_by(name="emp5") .reset_joinpoint() .order_by(Employee.name) ], ["emp6", "emp7"], ) class CompositeJoinPartialFK(fixtures.MappedTest, AssertsCompiledSQL): __dialect__ = "default" @classmethod def define_tables(cls, metadata): Table( "parent", metadata, Column("x", Integer, primary_key=True), Column("y", Integer, primary_key=True), Column("z", Integer), ) Table( "child", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("x", Integer), Column("y", Integer), Column("z", Integer), # note 'z' is not here sa.ForeignKeyConstraint(["x", "y"], ["parent.x", "parent.y"]), ) @classmethod def setup_mappers(cls): parent, child = cls.tables.parent, cls.tables.child class Parent(cls.Comparable): pass class Child(cls.Comparable): pass mapper( Parent, parent, properties={ "children": relationship( Child, primaryjoin=and_( parent.c.x == child.c.x, parent.c.y == child.c.y, parent.c.z == child.c.z, ), ) }, ) mapper(Child, child) def test_joins_fully(self): Parent, Child = self.classes.Parent, self.classes.Child self.assert_compile( Parent.children.property.strategy._lazywhere, ":param_1 = child.x AND :param_2 = child.y AND :param_3 = child.z", ) class SynonymsAsFKsTest(fixtures.MappedTest): """Syncrules on foreign keys that are also primary""" @classmethod def define_tables(cls, metadata): Table( "tableA", metadata, Column("id", Integer, primary_key=True), Column("foo", Integer), test_needs_fk=True, ) Table( "tableB", metadata, Column("id", Integer, primary_key=True), Column("_a_id", Integer, key="a_id", primary_key=True), test_needs_fk=True, ) @classmethod def setup_classes(cls): class A(cls.Basic): pass class B(cls.Basic): @property def a_id(self): return self._a_id def test_synonym_fk(self): """test that active history is enabled on a one-to-many/one that has use_get==True""" tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) mapper( B, tableB, properties={"a_id": synonym("_a_id", map_column=True)} ) mapper( A, tableA, properties={ "b": relationship( B, primaryjoin=(tableA.c.id == foreign(B.a_id)), uselist=False, ) }, ) sess = create_session() b = B(id=0) a = A(id=0, b=b) sess.add(a) sess.add(b) sess.flush() sess.expunge_all() assert a.b == b assert a.id == b.a_id assert a.id == b._a_id class FKsAsPksTest(fixtures.MappedTest): """Syncrules on foreign keys that are also primary""" @classmethod def define_tables(cls, metadata): Table( "tableA", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("foo", Integer), test_needs_fk=True, ) Table( "tableB", metadata, Column("id", Integer, ForeignKey("tableA.id"), primary_key=True), test_needs_fk=True, ) @classmethod def setup_classes(cls): class A(cls.Basic): pass class B(cls.Basic): pass def test_onetoone_switch(self): """test that active history is enabled on a one-to-many/one that has use_get==True""" tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) mapper( A, tableA, properties={ "b": relationship( B, cascade="all,delete-orphan", uselist=False ) }, ) mapper(B, tableB) configure_mappers() assert A.b.property.strategy.use_get sess = create_session() a1 = A() sess.add(a1) sess.flush() sess.close() a1 = sess.query(A).first() a1.b = B() sess.flush() def test_no_delete_PK_AtoB(self): """A cant be deleted without B because B would have no PK value.""" tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) mapper( A, tableA, properties={"bs": relationship(B, cascade="save-update")}, ) mapper(B, tableB) a1 = A() a1.bs.append(B()) sess = create_session() sess.add(a1) sess.flush() sess.delete(a1) try: sess.flush() assert False except AssertionError as e: startswith_( str(e), "Dependency rule tried to blank-out " "primary key column 'tableB.id' on instance ", ) def test_no_delete_PK_BtoA(self): tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) mapper( B, tableB, properties={"a": relationship(A, cascade="save-update")} ) mapper(A, tableA) b1 = B() a1 = A() b1.a = a1 sess = create_session() sess.add(b1) sess.flush() b1.a = None try: sess.flush() assert False except AssertionError as e: startswith_( str(e), "Dependency rule tried to blank-out " "primary key column 'tableB.id' on instance ", ) @testing.fails_on_everything_except( "sqlite", testing.requires.mysql_non_strict ) def test_nullPKsOK_BtoA(self): A, tableA = self.classes.A, self.tables.tableA # postgresql cant handle a nullable PK column...? tableC = Table( "tablec", tableA.metadata, Column("id", Integer, primary_key=True), Column( "a_id", Integer, ForeignKey("tableA.id"), primary_key=True, nullable=True, ), ) tableC.create() class C(fixtures.BasicEntity): pass mapper( C, tableC, properties={"a": relationship(A, cascade="save-update")} ) mapper(A, tableA) c1 = C() c1.id = 5 c1.a = None sess = create_session() sess.add(c1) # test that no error is raised. sess.flush() def test_delete_cascade_BtoA(self): """No 'blank the PK' error when the child is to be deleted as part of a cascade""" tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) for cascade in ( "save-update, delete", # "save-update, delete-orphan", "save-update, delete, delete-orphan", ): mapper( B, tableB, properties={ "a": relationship(A, cascade=cascade, single_parent=True) }, ) mapper(A, tableA) b1 = B() a1 = A() b1.a = a1 sess = create_session() sess.add(b1) sess.flush() sess.delete(b1) sess.flush() assert a1 not in sess assert b1 not in sess sess.expunge_all() sa.orm.clear_mappers() def test_delete_cascade_AtoB(self): """No 'blank the PK' error when the child is to be deleted as part of a cascade""" tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) for cascade in ( "save-update, delete", # "save-update, delete-orphan", "save-update, delete, delete-orphan", ): mapper( A, tableA, properties={"bs": relationship(B, cascade=cascade)} ) mapper(B, tableB) a1 = A() b1 = B() a1.bs.append(b1) sess = create_session() sess.add(a1) sess.flush() sess.delete(a1) sess.flush() assert a1 not in sess assert b1 not in sess sess.expunge_all() sa.orm.clear_mappers() def test_delete_manual_AtoB(self): tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) mapper(A, tableA, properties={"bs": relationship(B, cascade="none")}) mapper(B, tableB) a1 = A() b1 = B() a1.bs.append(b1) sess = create_session() sess.add(a1) sess.add(b1) sess.flush() sess.delete(a1) sess.delete(b1) sess.flush() assert a1 not in sess assert b1 not in sess sess.expunge_all() def test_delete_manual_BtoA(self): tableB, A, B, tableA = ( self.tables.tableB, self.classes.A, self.classes.B, self.tables.tableA, ) mapper(B, tableB, properties={"a": relationship(A, cascade="none")}) mapper(A, tableA) b1 = B() a1 = A() b1.a = a1 sess = create_session() sess.add(b1) sess.add(a1) sess.flush() sess.delete(b1) sess.delete(a1) sess.flush() assert a1 not in sess assert b1 not in sess class UniqueColReferenceSwitchTest(fixtures.MappedTest): """test a relationship based on a primary join against a unique non-pk column""" @classmethod def define_tables(cls, metadata): Table( "table_a", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("ident", String(10), nullable=False, unique=True), ) Table( "table_b", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column( "a_ident", String(10), ForeignKey("table_a.ident"), nullable=False, ), ) @classmethod def setup_classes(cls): class A(cls.Comparable): pass class B(cls.Comparable): pass def test_switch_parent(self): A, B, table_b, table_a = ( self.classes.A, self.classes.B, self.tables.table_b, self.tables.table_a, ) mapper(A, table_a) mapper(B, table_b, properties={"a": relationship(A, backref="bs")}) session = create_session() a1, a2 = A(ident="uuid1"), A(ident="uuid2") session.add_all([a1, a2]) a1.bs = [B(), B()] session.flush() session.expire_all() a1, a2 = session.query(A).all() for b in list(a1.bs): b.a = a2 session.delete(a1) session.flush() class RelationshipToSelectableTest(fixtures.MappedTest): """Test a map to a select that relates to a map to the table.""" @classmethod def define_tables(cls, metadata): Table( "items", metadata, Column( "item_policy_num", String(10), primary_key=True, key="policyNum", ), Column( "item_policy_eff_date", sa.Date, primary_key=True, key="policyEffDate", ), Column("item_type", String(20), primary_key=True, key="type"), Column( "item_id", Integer, primary_key=True, key="id", autoincrement=False, ), ) def test_basic(self): items = self.tables.items class Container(fixtures.BasicEntity): pass class LineItem(fixtures.BasicEntity): pass container_select = sa.select( [items.c.policyNum, items.c.policyEffDate, items.c.type], distinct=True, ).alias("container_select") mapper(LineItem, items) mapper( Container, container_select, properties=dict( lineItems=relationship( LineItem, lazy="select", cascade="all, delete-orphan", order_by=sa.asc(items.c.id), primaryjoin=sa.and_( container_select.c.policyNum == items.c.policyNum, container_select.c.policyEffDate == items.c.policyEffDate, container_select.c.type == items.c.type, ), foreign_keys=[ items.c.policyNum, items.c.policyEffDate, items.c.type, ], ) ), ) session = create_session() con = Container() con.policyNum = "99" con.policyEffDate = datetime.date.today() con.type = "TESTER" session.add(con) for i in range(0, 10): li = LineItem() li.id = i con.lineItems.append(li) session.add(li) session.flush() session.expunge_all() newcon = ( session.query(Container).order_by(container_select.c.type).first() ) assert con.policyNum == newcon.policyNum assert len(newcon.lineItems) == 10 for old, new in zip(con.lineItems, newcon.lineItems): eq_(old.id, new.id) class FKEquatedToConstantTest(fixtures.MappedTest): """test a relationship with a non-column entity in the primary join, is not viewonly, and also has the non-column's clause mentioned in the foreign keys list. """ @classmethod def define_tables(cls, metadata): Table( "tags", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) Table( "tag_foo", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("tagid", Integer), Column("data", String(50)), ) def test_basic(self): tag_foo, tags = self.tables.tag_foo, self.tables.tags class Tag(fixtures.ComparableEntity): pass class TagInstance(fixtures.ComparableEntity): pass mapper( Tag, tags, properties={ "foo": relationship( TagInstance, primaryjoin=sa.and_( tag_foo.c.data == "iplc_case", tag_foo.c.tagid == tags.c.id, ), foreign_keys=[tag_foo.c.tagid, tag_foo.c.data], ) }, ) mapper(TagInstance, tag_foo) sess = create_session() t1 = Tag(data="some tag") t1.foo.append(TagInstance(data="iplc_case")) t1.foo.append(TagInstance(data="not_iplc_case")) sess.add(t1) sess.flush() sess.expunge_all() # relationship works eq_( sess.query(Tag).all(), [Tag(data="some tag", foo=[TagInstance(data="iplc_case")])], ) # both TagInstances were persisted eq_( sess.query(TagInstance).order_by(TagInstance.data).all(), [TagInstance(data="iplc_case"), TagInstance(data="not_iplc_case")], ) class BackrefPropagatesForwardsArgs(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(50)), ) Table( "addresses", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("user_id", Integer), Column("email", String(50)), ) @classmethod def setup_classes(cls): class User(cls.Comparable): pass class Address(cls.Comparable): pass def test_backref(self): User, Address, users, addresses = ( self.classes.User, self.classes.Address, self.tables.users, self.tables.addresses, ) mapper( User, users, properties={ "addresses": relationship( Address, primaryjoin=addresses.c.user_id == users.c.id, foreign_keys=addresses.c.user_id, backref="user", ) }, ) mapper(Address, addresses) sess = sessionmaker()() u1 = User(name="u1", addresses=[Address(email="a1")]) sess.add(u1) sess.commit() eq_( sess.query(Address).all(), [Address(email="a1", user=User(name="u1"))], ) class AmbiguousJoinInterpretedAsSelfRef(fixtures.MappedTest): """test ambiguous joins due to FKs on both sides treated as self-referential. this mapping is very similar to that of test/orm/inheritance/query.py SelfReferentialTestJoinedToBase , except that inheritance is not used here. """ @classmethod def define_tables(cls, metadata): Table( "subscriber", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), ) Table( "address", metadata, Column( "subscriber_id", Integer, ForeignKey("subscriber.id"), primary_key=True, ), Column("type", String(1), primary_key=True), ) @classmethod def setup_mappers(cls): subscriber, address = cls.tables.subscriber, cls.tables.address subscriber_and_address = subscriber.join( address, and_( address.c.subscriber_id == subscriber.c.id, address.c.type.in_(["A", "B", "C"]), ), ) class Address(cls.Comparable): pass class Subscriber(cls.Comparable): pass mapper(Address, address) mapper( Subscriber, subscriber_and_address, properties={ "id": [subscriber.c.id, address.c.subscriber_id], "addresses": relationship( Address, backref=backref("customer") ), }, ) def test_mapping(self): Subscriber, Address = self.classes.Subscriber, self.classes.Address sess = create_session() assert Subscriber.addresses.property.direction is ONETOMANY assert Address.customer.property.direction is MANYTOONE s1 = Subscriber( type="A", addresses=[Address(type="D"), Address(type="E")] ) a1 = Address(type="B", customer=Subscriber(type="C")) assert s1.addresses[0].customer is s1 assert a1.customer.addresses[0] is a1 sess.add_all([s1, a1]) sess.flush() sess.expunge_all() eq_( sess.query(Subscriber).order_by(Subscriber.type).all(), [ Subscriber(id=1, type="A"), Subscriber(id=2, type="B"), Subscriber(id=2, type="C"), ], ) class ManualBackrefTest(_fixtures.FixtureTest): """Test explicit relationships that are backrefs to each other.""" run_inserts = None def test_o2m(self): users, Address, addresses, User = ( self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User, ) mapper( User, users, properties={ "addresses": relationship(Address, back_populates="user") }, ) mapper( Address, addresses, properties={ "user": relationship(User, back_populates="addresses") }, ) sess = create_session() u1 = User(name="u1") a1 = Address(email_address="foo") u1.addresses.append(a1) assert a1.user is u1 sess.add(u1) sess.flush() sess.expire_all() assert sess.query(Address).one() is a1 assert a1.user is u1 assert a1 in u1.addresses def test_invalid_key(self): users, Address, addresses, User = ( self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User, ) mapper( User, users, properties={ "addresses": relationship(Address, back_populates="userr") }, ) mapper( Address, addresses, properties={ "user": relationship(User, back_populates="addresses") }, ) assert_raises(sa.exc.InvalidRequestError, configure_mappers) def test_invalid_target(self): addresses, Dingaling, User, dingalings, Address, users = ( self.tables.addresses, self.classes.Dingaling, self.classes.User, self.tables.dingalings, self.classes.Address, self.tables.users, ) mapper( User, users, properties={ "addresses": relationship(Address, back_populates="dingaling") }, ) mapper(Dingaling, dingalings) mapper( Address, addresses, properties={"dingaling": relationship(Dingaling)}, ) assert_raises_message( sa.exc.ArgumentError, r"reverse_property 'dingaling' on relationship " r"User.addresses references " r"relationship Address.dingaling, " r"which does not " r"reference mapper Mapper\|User\|users", configure_mappers, ) class NoLoadBackPopulates(_fixtures.FixtureTest): """test the noload stratgegy which unlike others doesn't use lazyloader to set up instrumentation""" def test_o2m(self): users, Address, addresses, User = ( self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User, ) mapper( User, users, properties={ "addresses": relationship( Address, back_populates="user", lazy="noload" ) }, ) mapper(Address, addresses, properties={"user": relationship(User)}) u1 = User() a1 = Address() u1.addresses.append(a1) is_(a1.user, u1) def test_m2o(self): users, Address, addresses, User = ( self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User, ) mapper(User, users, properties={"addresses": relationship(Address)}) mapper( Address, addresses, properties={ "user": relationship( User, back_populates="addresses", lazy="noload" ) }, ) u1 = User() a1 = Address() a1.user = u1 in_(a1, u1.addresses) class JoinConditionErrorTest(fixtures.TestBase): def test_clauseelement_pj(self): from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class C1(Base): __tablename__ = "c1" id = Column("id", Integer, primary_key=True) class C2(Base): __tablename__ = "c2" id = Column("id", Integer, primary_key=True) c1id = Column("c1id", Integer, ForeignKey("c1.id")) c2 = relationship(C1, primaryjoin=C1.id) assert_raises(sa.exc.ArgumentError, configure_mappers) def test_clauseelement_pj_false(self): from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class C1(Base): __tablename__ = "c1" id = Column("id", Integer, primary_key=True) class C2(Base): __tablename__ = "c2" id = Column("id", Integer, primary_key=True) c1id = Column("c1id", Integer, ForeignKey("c1.id")) c2 = relationship(C1, primaryjoin="x" == "y") assert_raises(sa.exc.ArgumentError, configure_mappers) def test_only_column_elements(self): m = MetaData() t1 = Table( "t1", m, Column("id", Integer, primary_key=True), Column("foo_id", Integer, ForeignKey("t2.id")), ) t2 = Table("t2", m, Column("id", Integer, primary_key=True)) class C1(object): pass class C2(object): pass mapper( C1, t1, properties={"c2": relationship(C2, primaryjoin=t1.join(t2))}, ) mapper(C2, t2) assert_raises(sa.exc.ArgumentError, configure_mappers) def test_invalid_string_args(self): from sqlalchemy.ext.declarative import declarative_base for argname, arg in [ ("remote_side", ["c1.id"]), ("remote_side", ["id"]), ("foreign_keys", ["c1id"]), ("foreign_keys", ["C2.c1id"]), ("order_by", ["id"]), ]: clear_mappers() kw = {argname: arg} Base = declarative_base() class C1(Base): __tablename__ = "c1" id = Column("id", Integer, primary_key=True) class C2(Base): __tablename__ = "c2" id_ = Column("id", Integer, primary_key=True) c1id = Column("c1id", Integer, ForeignKey("c1.id")) c2 = relationship(C1, **kw) assert_raises_message( sa.exc.ArgumentError, "Column-based expression object expected " "for argument '%s'; got: '%s', type %r" % (argname, arg[0], type(arg[0])), configure_mappers, ) def test_fk_error_not_raised_unrelated(self): m = MetaData() t1 = Table( "t1", m, Column("id", Integer, primary_key=True), Column("foo_id", Integer, ForeignKey("t2.nonexistent_id")), ) t2 = Table("t2", m, Column("id", Integer, primary_key=True)) # noqa t3 = Table( "t3", m, Column("id", Integer, primary_key=True), Column("t1id", Integer, ForeignKey("t1.id")), ) class C1(object): pass class C2(object): pass mapper(C1, t1, properties={"c2": relationship(C2)}) mapper(C2, t3) assert C1.c2.property.primaryjoin.compare(t1.c.id == t3.c.t1id) def test_join_error_raised(self): m = MetaData() t1 = Table("t1", m, Column("id", Integer, primary_key=True)) t2 = Table("t2", m, Column("id", Integer, primary_key=True)) # noqa t3 = Table( "t3", m, Column("id", Integer, primary_key=True), Column("t1id", Integer), ) class C1(object): pass class C2(object): pass mapper(C1, t1, properties={"c2": relationship(C2)}) mapper(C2, t3) assert_raises(sa.exc.ArgumentError, configure_mappers) def teardown(self): clear_mappers() class TypeMatchTest(fixtures.MappedTest): """test errors raised when trying to add items whose type is not handled by a relationship""" @classmethod def define_tables(cls, metadata): Table( "a", metadata, Column( "aid", Integer, primary_key=True, test_needs_autoincrement=True ), Column("adata", String(30)), ) Table( "b", metadata, Column( "bid", Integer, primary_key=True, test_needs_autoincrement=True ), Column("a_id", Integer, ForeignKey("a.aid")), Column("bdata", String(30)), ) Table( "c", metadata, Column( "cid", Integer, primary_key=True, test_needs_autoincrement=True ), Column("b_id", Integer, ForeignKey("b.bid")), Column("cdata", String(30)), ) Table( "d", metadata, Column( "did", Integer, primary_key=True, test_needs_autoincrement=True ), Column("a_id", Integer, ForeignKey("a.aid")), Column("ddata", String(30)), ) def test_o2m_oncascade(self): a, c, b = (self.tables.a, self.tables.c, self.tables.b) class A(fixtures.BasicEntity): pass class B(fixtures.BasicEntity): pass class C(fixtures.BasicEntity): pass mapper(A, a, properties={"bs": relationship(B)}) mapper(B, b) mapper(C, c) a1 = A() b1 = B() c1 = C() a1.bs.append(b1) a1.bs.append(c1) sess = create_session() try: sess.add(a1) assert False except AssertionError as err: eq_( str(err), "Attribute 'bs' on class '%s' doesn't handle " "objects of type '%s'" % (A, C), ) def test_o2m_onflush(self): a, c, b = (self.tables.a, self.tables.c, self.tables.b) class A(fixtures.BasicEntity): pass class B(fixtures.BasicEntity): pass class C(fixtures.BasicEntity): pass mapper(A, a, properties={"bs": relationship(B, cascade="none")}) mapper(B, b) mapper(C, c) a1 = A() b1 = B() c1 = C() a1.bs.append(b1) a1.bs.append(c1) sess = create_session() sess.add(a1) sess.add(b1) sess.add(c1) assert_raises_message( sa.orm.exc.FlushError, "Attempting to flush an item", sess.flush ) def test_o2m_nopoly_onflush(self): a, c, b = (self.tables.a, self.tables.c, self.tables.b) class A(fixtures.BasicEntity): pass class B(fixtures.BasicEntity): pass class C(B): pass mapper(A, a, properties={"bs": relationship(B, cascade="none")}) mapper(B, b) mapper(C, c, inherits=B) a1 = A() b1 = B() c1 = C() a1.bs.append(b1) a1.bs.append(c1) sess = create_session() sess.add(a1) sess.add(b1) sess.add(c1) assert_raises_message( sa.orm.exc.FlushError, "Attempting to flush an item", sess.flush ) def test_m2o_nopoly_onflush(self): a, b, d = (self.tables.a, self.tables.b, self.tables.d) class A(fixtures.BasicEntity): pass class B(A): pass class D(fixtures.BasicEntity): pass mapper(A, a) mapper(B, b, inherits=A) mapper(D, d, properties={"a": relationship(A, cascade="none")}) b1 = B() d1 = D() d1.a = b1 sess = create_session() sess.add(b1) sess.add(d1) assert_raises_message( sa.orm.exc.FlushError, "Attempting to flush an item", sess.flush ) def test_m2o_oncascade(self): a, b, d = (self.tables.a, self.tables.b, self.tables.d) class A(fixtures.BasicEntity): pass class B(fixtures.BasicEntity): pass class D(fixtures.BasicEntity): pass mapper(A, a) mapper(B, b) mapper(D, d, properties={"a": relationship(A)}) b1 = B() d1 = D() d1.a = b1 sess = create_session() assert_raises_message( AssertionError, "doesn't handle objects of type", sess.add, d1 ) class TypedAssociationTable(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): class MySpecialType(sa.types.TypeDecorator): impl = String def process_bind_param(self, value, dialect): return "lala" + value def process_result_value(self, value, dialect): return value[4:] Table( "t1", metadata, Column("col1", MySpecialType(30), primary_key=True), Column("col2", String(30)), ) Table( "t2", metadata, Column("col1", MySpecialType(30), primary_key=True), Column("col2", String(30)), ) Table( "t3", metadata, Column("t1c1", MySpecialType(30), ForeignKey("t1.col1")), Column("t2c1", MySpecialType(30), ForeignKey("t2.col1")), ) def test_m2m(self): """Many-to-many tables with special types for candidate keys.""" t2, t3, t1 = (self.tables.t2, self.tables.t3, self.tables.t1) class T1(fixtures.BasicEntity): pass class T2(fixtures.BasicEntity): pass mapper(T2, t2) mapper( T1, t1, properties={"t2s": relationship(T2, secondary=t3, backref="t1s")}, ) a = T1() a.col1 = "aid" b = T2() b.col1 = "bid" c = T2() c.col1 = "cid" a.t2s.append(b) a.t2s.append(c) sess = create_session() sess.add(a) sess.flush() eq_(select([func.count("*")]).select_from(t3).scalar(), 2) a.t2s.remove(c) sess.flush() eq_(select([func.count("*")]).select_from(t3).scalar(), 1) class CustomOperatorTest(fixtures.MappedTest, AssertsCompiledSQL): """test op() in conjunction with join conditions""" run_create_tables = run_deletes = None __dialect__ = "default" @classmethod def define_tables(cls, metadata): Table( "a", metadata, Column("id", Integer, primary_key=True), Column("foo", String(50)), ) Table( "b", metadata, Column("id", Integer, primary_key=True), Column("foo", String(50)), ) def test_join_on_custom_op(self): class A(fixtures.BasicEntity): pass class B(fixtures.BasicEntity): pass mapper( A, self.tables.a, properties={ "bs": relationship( B, primaryjoin=self.tables.a.c.foo.op( "&*", is_comparison=True )(foreign(self.tables.b.c.foo)), viewonly=True, ) }, ) mapper(B, self.tables.b) self.assert_compile( Session().query(A).join(A.bs), "SELECT a.id AS a_id, a.foo AS a_foo " "FROM a JOIN b ON a.foo &* b.foo", ) class ViewOnlyHistoryTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), Column("t1id", Integer, ForeignKey("t1.id")), ) def _assert_fk(self, a1, b1, is_set): s = Session(testing.db) s.add_all([a1, b1]) s.flush() if is_set: eq_(b1.t1id, a1.id) else: eq_(b1.t1id, None) return s def test_o2m_viewonly_oneside(self): class A(fixtures.ComparableEntity): pass class B(fixtures.ComparableEntity): pass mapper( A, self.tables.t1, properties={ "bs": relationship( B, viewonly=True, backref=backref("a", viewonly=False) ) }, ) mapper(B, self.tables.t2) a1 = A() b1 = B() a1.bs.append(b1) assert b1.a is a1 assert not inspect(a1).attrs.bs.history.has_changes() assert inspect(b1).attrs.a.history.has_changes() sess = self._assert_fk(a1, b1, True) a1.bs.remove(b1) assert a1 not in sess.dirty assert b1 in sess.dirty def test_m2o_viewonly_oneside(self): class A(fixtures.ComparableEntity): pass class B(fixtures.ComparableEntity): pass mapper( A, self.tables.t1, properties={ "bs": relationship( B, viewonly=False, backref=backref("a", viewonly=True) ) }, ) mapper(B, self.tables.t2) a1 = A() b1 = B() b1.a = a1 assert b1 in a1.bs assert inspect(a1).attrs.bs.history.has_changes() assert not inspect(b1).attrs.a.history.has_changes() sess = self._assert_fk(a1, b1, True) a1.bs.remove(b1) assert a1 in sess.dirty assert b1 not in sess.dirty def test_o2m_viewonly_only(self): class A(fixtures.ComparableEntity): pass class B(fixtures.ComparableEntity): pass mapper( A, self.tables.t1, properties={"bs": relationship(B, viewonly=True)}, ) mapper(B, self.tables.t2) a1 = A() b1 = B() a1.bs.append(b1) assert not inspect(a1).attrs.bs.history.has_changes() self._assert_fk(a1, b1, False) def test_m2o_viewonly_only(self): class A(fixtures.ComparableEntity): pass class B(fixtures.ComparableEntity): pass mapper(A, self.tables.t1) mapper( B, self.tables.t2, properties={"a": relationship(A, viewonly=True)} ) a1 = A() b1 = B() b1.a = a1 assert not inspect(b1).attrs.a.history.has_changes() self._assert_fk(a1, b1, False) class ViewOnlyM2MBackrefTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), ) Table( "t1t2", metadata, Column("t1id", Integer, ForeignKey("t1.id"), primary_key=True), Column("t2id", Integer, ForeignKey("t2.id"), primary_key=True), ) def test_viewonly(self): t1t2, t2, t1 = (self.tables.t1t2, self.tables.t2, self.tables.t1) class A(fixtures.ComparableEntity): pass class B(fixtures.ComparableEntity): pass mapper( A, t1, properties={ "bs": relationship( B, secondary=t1t2, backref=backref("as_", viewonly=True) ) }, ) mapper(B, t2) sess = create_session() a1 = A() b1 = B(as_=[a1]) assert not inspect(b1).attrs.as_.history.has_changes() sess.add(a1) sess.flush() eq_(sess.query(A).first(), A(bs=[B(id=b1.id)])) eq_(sess.query(B).first(), B(as_=[A(id=a1.id)])) class ViewOnlyOverlappingNames(fixtures.MappedTest): """'viewonly' mappings with overlapping PK column names.""" @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), Column("t1id", Integer, ForeignKey("t1.id")), ) Table( "t3", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(40)), Column("t2id", Integer, ForeignKey("t2.id")), ) def test_three_table_view(self): """A three table join with overlapping PK names. A third table is pulled into the primary join condition using overlapping PK column names and should not produce 'conflicting column' error. """ t2, t3, t1 = (self.tables.t2, self.tables.t3, self.tables.t1) class C1(fixtures.BasicEntity): pass class C2(fixtures.BasicEntity): pass class C3(fixtures.BasicEntity): pass mapper( C1, t1, properties={ "t2s": relationship(C2), "t2_view": relationship( C2, viewonly=True, primaryjoin=sa.and_( t1.c.id == t2.c.t1id, t3.c.t2id == t2.c.id, t3.c.data == t1.c.data, ), ), }, ) mapper(C2, t2) mapper(C3, t3, properties={"t2": relationship(C2)}) c1 = C1() c1.data = "c1data" c2a = C2() c1.t2s.append(c2a) c2b = C2() c1.t2s.append(c2b) c3 = C3() c3.data = "c1data" c3.t2 = c2b sess = create_session() sess.add(c1) sess.add(c3) sess.flush() sess.expunge_all() c1 = sess.query(C1).get(c1.id) assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id]) assert set([x.id for x in c1.t2_view]) == set([c2b.id]) class ViewOnlyUniqueNames(fixtures.MappedTest): """'viewonly' mappings with unique PK column names.""" @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "t1id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("data", String(40)), ) Table( "t2", metadata, Column( "t2id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("data", String(40)), Column("t1id_ref", Integer, ForeignKey("t1.t1id")), ) Table( "t3", metadata, Column( "t3id", Integer, primary_key=True, test_needs_autoincrement=True, ), Column("data", String(40)), Column("t2id_ref", Integer, ForeignKey("t2.t2id")), ) def test_three_table_view(self): """A three table join with overlapping PK names. A third table is pulled into the primary join condition using unique PK column names and should not produce 'mapper has no columnX' error. """ t2, t3, t1 = (self.tables.t2, self.tables.t3, self.tables.t1) class C1(fixtures.BasicEntity): pass class C2(fixtures.BasicEntity): pass class C3(fixtures.BasicEntity): pass mapper( C1, t1, properties={ "t2s": relationship(C2), "t2_view": relationship( C2, viewonly=True, primaryjoin=sa.and_( t1.c.t1id == t2.c.t1id_ref, t3.c.t2id_ref == t2.c.t2id, t3.c.data == t1.c.data, ), ), }, ) mapper(C2, t2) mapper(C3, t3, properties={"t2": relationship(C2)}) c1 = C1() c1.data = "c1data" c2a = C2() c1.t2s.append(c2a) c2b = C2() c1.t2s.append(c2b) c3 = C3() c3.data = "c1data" c3.t2 = c2b sess = create_session() sess.add_all((c1, c3)) sess.flush() sess.expunge_all() c1 = sess.query(C1).get(c1.t1id) assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id]) assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id]) class ViewOnlyLocalRemoteM2M(fixtures.TestBase): """test that local-remote is correctly determined for m2m""" def test_local_remote(self): meta = MetaData() t1 = Table("t1", meta, Column("id", Integer, primary_key=True)) t2 = Table("t2", meta, Column("id", Integer, primary_key=True)) t12 = Table( "tab", meta, Column("t1_id", Integer, ForeignKey("t1.id")), Column("t2_id", Integer, ForeignKey("t2.id")), ) class A(object): pass class B(object): pass mapper(B, t2) m = mapper( A, t1, properties=dict( b_view=relationship(B, secondary=t12, viewonly=True), b_plain=relationship(B, secondary=t12), ), ) configure_mappers() assert ( m.get_property("b_view").local_remote_pairs == m.get_property("b_plain").local_remote_pairs == [(t1.c.id, t12.c.t1_id), (t2.c.id, t12.c.t2_id)] ) class ViewOnlyNonEquijoin(fixtures.MappedTest): """'viewonly' mappings based on non-equijoins.""" @classmethod def define_tables(cls, metadata): Table("foos", metadata, Column("id", Integer, primary_key=True)) Table( "bars", metadata, Column("id", Integer, primary_key=True), Column("fid", Integer), ) def test_viewonly_join(self): bars, foos = self.tables.bars, self.tables.foos class Foo(fixtures.ComparableEntity): pass class Bar(fixtures.ComparableEntity): pass mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=foos.c.id > bars.c.fid, foreign_keys=[bars.c.fid], viewonly=True, ) }, ) mapper(Bar, bars) sess = create_session() sess.add_all( ( Foo(id=4), Foo(id=9), Bar(id=1, fid=2), Bar(id=2, fid=3), Bar(id=3, fid=6), Bar(id=4, fid=7), ) ) sess.flush() sess = create_session() eq_( sess.query(Foo).filter_by(id=4).one(), Foo(id=4, bars=[Bar(fid=2), Bar(fid=3)]), ) eq_( sess.query(Foo).filter_by(id=9).one(), Foo(id=9, bars=[Bar(fid=2), Bar(fid=3), Bar(fid=6), Bar(fid=7)]), ) class ViewOnlyRepeatedRemoteColumn(fixtures.MappedTest): """'viewonly' mappings that contain the same 'remote' column twice""" @classmethod def define_tables(cls, metadata): Table( "foos", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("bid1", Integer, ForeignKey("bars.id")), Column("bid2", Integer, ForeignKey("bars.id")), ) Table( "bars", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) def test_relationship_on_or(self): bars, foos = self.tables.bars, self.tables.foos class Foo(fixtures.ComparableEntity): pass class Bar(fixtures.ComparableEntity): pass mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=sa.or_( bars.c.id == foos.c.bid1, bars.c.id == foos.c.bid2 ), uselist=True, viewonly=True, ) }, ) mapper(Bar, bars) sess = create_session() b1 = Bar(id=1, data="b1") b2 = Bar(id=2, data="b2") b3 = Bar(id=3, data="b3") f1 = Foo(bid1=1, bid2=2) f2 = Foo(bid1=3, bid2=None) sess.add_all((b1, b2, b3)) sess.flush() sess.add_all((f1, f2)) sess.flush() sess.expunge_all() eq_( sess.query(Foo).filter_by(id=f1.id).one(), Foo(bars=[Bar(data="b1"), Bar(data="b2")]), ) eq_( sess.query(Foo).filter_by(id=f2.id).one(), Foo(bars=[Bar(data="b3")]), ) class ViewOnlyRepeatedLocalColumn(fixtures.MappedTest): """'viewonly' mappings that contain the same 'local' column twice""" @classmethod def define_tables(cls, metadata): Table( "foos", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) Table( "bars", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("fid1", Integer, ForeignKey("foos.id")), Column("fid2", Integer, ForeignKey("foos.id")), Column("data", String(50)), ) def test_relationship_on_or(self): bars, foos = self.tables.bars, self.tables.foos class Foo(fixtures.ComparableEntity): pass class Bar(fixtures.ComparableEntity): pass mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=sa.or_( bars.c.fid1 == foos.c.id, bars.c.fid2 == foos.c.id ), viewonly=True, ) }, ) mapper(Bar, bars) sess = create_session() f1 = Foo(id=1, data="f1") f2 = Foo(id=2, data="f2") b1 = Bar(fid1=1, data="b1") b2 = Bar(fid2=1, data="b2") b3 = Bar(fid1=2, data="b3") b4 = Bar(fid1=1, fid2=2, data="b4") sess.add_all((f1, f2)) sess.flush() sess.add_all((b1, b2, b3, b4)) sess.flush() sess.expunge_all() eq_( sess.query(Foo).filter_by(id=f1.id).one(), Foo(bars=[Bar(data="b1"), Bar(data="b2"), Bar(data="b4")]), ) eq_( sess.query(Foo).filter_by(id=f2.id).one(), Foo(bars=[Bar(data="b3"), Bar(data="b4")]), ) class ViewOnlyComplexJoin(_RelationshipErrors, fixtures.MappedTest): """'viewonly' mappings with a complex join condition.""" @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), Column("t1id", Integer, ForeignKey("t1.id")), ) Table( "t3", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), ) Table( "t2tot3", metadata, Column("t2id", Integer, ForeignKey("t2.id")), Column("t3id", Integer, ForeignKey("t3.id")), ) @classmethod def setup_classes(cls): class T1(cls.Comparable): pass class T2(cls.Comparable): pass class T3(cls.Comparable): pass def test_basic(self): T1, t2, T2, T3, t3, t2tot3, t1 = ( self.classes.T1, self.tables.t2, self.classes.T2, self.classes.T3, self.tables.t3, self.tables.t2tot3, self.tables.t1, ) mapper( T1, t1, properties={ "t3s": relationship( T3, primaryjoin=sa.and_( t1.c.id == t2.c.t1id, t2.c.id == t2tot3.c.t2id, t3.c.id == t2tot3.c.t3id, ), viewonly=True, foreign_keys=t3.c.id, remote_side=t2.c.t1id, ) }, ) mapper( T2, t2, properties={ "t1": relationship(T1), "t3s": relationship(T3, secondary=t2tot3), }, ) mapper(T3, t3) sess = create_session() sess.add(T2(data="t2", t1=T1(data="t1"), t3s=[T3(data="t3")])) sess.flush() sess.expunge_all() a = sess.query(T1).first() eq_(a.t3s, [T3(data="t3")]) def test_remote_side_escalation(self): T1, t2, T2, T3, t3, t2tot3, t1 = ( self.classes.T1, self.tables.t2, self.classes.T2, self.classes.T3, self.tables.t3, self.tables.t2tot3, self.tables.t1, ) mapper( T1, t1, properties={ "t3s": relationship( T3, primaryjoin=sa.and_( t1.c.id == t2.c.t1id, t2.c.id == t2tot3.c.t2id, t3.c.id == t2tot3.c.t3id, ), viewonly=True, foreign_keys=t3.c.id, ) }, ) mapper( T2, t2, properties={ "t1": relationship(T1), "t3s": relationship(T3, secondary=t2tot3), }, ) mapper(T3, t3) self._assert_raises_no_local_remote(configure_mappers, "T1.t3s") class RemoteForeignBetweenColsTest(fixtures.DeclarativeMappedTest): """test a complex annotation using between(). Using declarative here as an integration test for the local() and remote() annotations in conjunction with already annotated instrumented attributes, etc. """ @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class Network(fixtures.ComparableEntity, Base): __tablename__ = "network" id = Column( sa.Integer, primary_key=True, test_needs_autoincrement=True ) ip_net_addr = Column(Integer) ip_broadcast_addr = Column(Integer) addresses = relationship( "Address", primaryjoin="remote(foreign(Address.ip_addr)).between(" "Network.ip_net_addr," "Network.ip_broadcast_addr)", viewonly=True, ) class Address(fixtures.ComparableEntity, Base): __tablename__ = "address" ip_addr = Column(Integer, primary_key=True) @classmethod def insert_data(cls): Network, Address = cls.classes.Network, cls.classes.Address s = Session(testing.db) s.add_all( [ Network(ip_net_addr=5, ip_broadcast_addr=10), Network(ip_net_addr=15, ip_broadcast_addr=25), Network(ip_net_addr=30, ip_broadcast_addr=35), Address(ip_addr=17), Address(ip_addr=18), Address(ip_addr=9), Address(ip_addr=27), ] ) s.commit() def test_col_query(self): Network, Address = self.classes.Network, self.classes.Address session = Session(testing.db) eq_( session.query(Address.ip_addr) .select_from(Network) .join(Network.addresses) .filter(Network.ip_net_addr == 15) .all(), [(17,), (18,)], ) def test_lazyload(self): Network, Address = self.classes.Network, self.classes.Address session = Session(testing.db) n3 = session.query(Network).filter(Network.ip_net_addr == 5).one() eq_([a.ip_addr for a in n3.addresses], [9]) class ExplicitLocalRemoteTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column("id", String(50), primary_key=True), Column("data", String(50)), ) Table( "t2", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("data", String(50)), Column("t1id", String(50)), ) @classmethod def setup_classes(cls): class T1(cls.Comparable): pass class T2(cls.Comparable): pass def test_onetomany_funcfk_oldstyle(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) # old _local_remote_pairs mapper( T1, t1, properties={ "t2s": relationship( T2, primaryjoin=t1.c.id == sa.func.lower(t2.c.t1id), _local_remote_pairs=[(t1.c.id, t2.c.t1id)], foreign_keys=[t2.c.t1id], ) }, ) mapper(T2, t2) self._test_onetomany() def test_onetomany_funcfk_annotated(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) # use annotation mapper( T1, t1, properties={ "t2s": relationship( T2, primaryjoin=t1.c.id == foreign(sa.func.lower(t2.c.t1id)), ) }, ) mapper(T2, t2) self._test_onetomany() def _test_onetomany(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) is_(T1.t2s.property.direction, ONETOMANY) eq_(T1.t2s.property.local_remote_pairs, [(t1.c.id, t2.c.t1id)]) sess = create_session() a1 = T1(id="number1", data="a1") a2 = T1(id="number2", data="a2") b1 = T2(data="b1", t1id="NuMbEr1") b2 = T2(data="b2", t1id="Number1") b3 = T2(data="b3", t1id="Number2") sess.add_all((a1, a2, b1, b2, b3)) sess.flush() sess.expunge_all() eq_( sess.query(T1).first(), T1( id="number1", data="a1", t2s=[ T2(data="b1", t1id="NuMbEr1"), T2(data="b2", t1id="Number1"), ], ), ) def test_manytoone_funcfk(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) mapper(T1, t1) mapper( T2, t2, properties={ "t1": relationship( T1, primaryjoin=t1.c.id == sa.func.lower(t2.c.t1id), _local_remote_pairs=[(t2.c.t1id, t1.c.id)], foreign_keys=[t2.c.t1id], uselist=True, ) }, ) sess = create_session() a1 = T1(id="number1", data="a1") a2 = T1(id="number2", data="a2") b1 = T2(data="b1", t1id="NuMbEr1") b2 = T2(data="b2", t1id="Number1") b3 = T2(data="b3", t1id="Number2") sess.add_all((a1, a2, b1, b2, b3)) sess.flush() sess.expunge_all() eq_( sess.query(T2).filter(T2.data.in_(["b1", "b2"])).all(), [ T2(data="b1", t1=[T1(id="number1", data="a1")]), T2(data="b2", t1=[T1(id="number1", data="a1")]), ], ) def test_onetomany_func_referent(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) mapper( T1, t1, properties={ "t2s": relationship( T2, primaryjoin=sa.func.lower(t1.c.id) == t2.c.t1id, _local_remote_pairs=[(t1.c.id, t2.c.t1id)], foreign_keys=[t2.c.t1id], ) }, ) mapper(T2, t2) sess = create_session() a1 = T1(id="NuMbeR1", data="a1") a2 = T1(id="NuMbeR2", data="a2") b1 = T2(data="b1", t1id="number1") b2 = T2(data="b2", t1id="number1") b3 = T2(data="b2", t1id="number2") sess.add_all((a1, a2, b1, b2, b3)) sess.flush() sess.expunge_all() eq_( sess.query(T1).first(), T1( id="NuMbeR1", data="a1", t2s=[ T2(data="b1", t1id="number1"), T2(data="b2", t1id="number1"), ], ), ) def test_manytoone_func_referent(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) mapper(T1, t1) mapper( T2, t2, properties={ "t1": relationship( T1, primaryjoin=sa.func.lower(t1.c.id) == t2.c.t1id, _local_remote_pairs=[(t2.c.t1id, t1.c.id)], foreign_keys=[t2.c.t1id], uselist=True, ) }, ) sess = create_session() a1 = T1(id="NuMbeR1", data="a1") a2 = T1(id="NuMbeR2", data="a2") b1 = T2(data="b1", t1id="number1") b2 = T2(data="b2", t1id="number1") b3 = T2(data="b3", t1id="number2") sess.add_all((a1, a2, b1, b2, b3)) sess.flush() sess.expunge_all() eq_( sess.query(T2).filter(T2.data.in_(["b1", "b2"])).all(), [ T2(data="b1", t1=[T1(id="NuMbeR1", data="a1")]), T2(data="b2", t1=[T1(id="NuMbeR1", data="a1")]), ], ) def test_escalation_1(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) mapper( T1, t1, properties={ "t2s": relationship( T2, primaryjoin=t1.c.id == sa.func.lower(t2.c.t1id), _local_remote_pairs=[(t1.c.id, t2.c.t1id)], foreign_keys=[t2.c.t1id], remote_side=[t2.c.t1id], ) }, ) mapper(T2, t2) assert_raises(sa.exc.ArgumentError, sa.orm.configure_mappers) def test_escalation_2(self): T2, T1, t2, t1 = ( self.classes.T2, self.classes.T1, self.tables.t2, self.tables.t1, ) mapper( T1, t1, properties={ "t2s": relationship( T2, primaryjoin=t1.c.id == sa.func.lower(t2.c.t1id), _local_remote_pairs=[(t1.c.id, t2.c.t1id)], ) }, ) mapper(T2, t2) assert_raises(sa.exc.ArgumentError, sa.orm.configure_mappers) class InvalidRemoteSideTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "t1", metadata, Column("id", Integer, primary_key=True), Column("data", String(50)), Column("t_id", Integer, ForeignKey("t1.id")), ) @classmethod def setup_classes(cls): class T1(cls.Comparable): pass def test_o2m_backref(self): T1, t1 = self.classes.T1, self.tables.t1 mapper(T1, t1, properties={"t1s": relationship(T1, backref="parent")}) assert_raises_message( sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are " r"both of the same direction symbol\('ONETOMANY'\). Did you " "mean to set remote_side on the many-to-one side ?", configure_mappers, ) def test_m2o_backref(self): T1, t1 = self.classes.T1, self.tables.t1 mapper( T1, t1, properties={ "t1s": relationship( T1, backref=backref("parent", remote_side=t1.c.id), remote_side=t1.c.id, ) }, ) assert_raises_message( sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are " r"both of the same direction symbol\('MANYTOONE'\). Did you " "mean to set remote_side on the many-to-one side ?", configure_mappers, ) def test_o2m_explicit(self): T1, t1 = self.classes.T1, self.tables.t1 mapper( T1, t1, properties={ "t1s": relationship(T1, back_populates="parent"), "parent": relationship(T1, back_populates="t1s"), }, ) # can't be sure of ordering here assert_raises_message( sa.exc.ArgumentError, r"both of the same direction symbol\('ONETOMANY'\). Did you " "mean to set remote_side on the many-to-one side ?", configure_mappers, ) def test_m2o_explicit(self): T1, t1 = self.classes.T1, self.tables.t1 mapper( T1, t1, properties={ "t1s": relationship( T1, back_populates="parent", remote_side=t1.c.id ), "parent": relationship( T1, back_populates="t1s", remote_side=t1.c.id ), }, ) # can't be sure of ordering here assert_raises_message( sa.exc.ArgumentError, r"both of the same direction symbol\('MANYTOONE'\). Did you " "mean to set remote_side on the many-to-one side ?", configure_mappers, ) class AmbiguousFKResolutionTest(_RelationshipErrors, fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table("a", metadata, Column("id", Integer, primary_key=True)) Table( "b", metadata, Column("id", Integer, primary_key=True), Column("aid_1", Integer, ForeignKey("a.id")), Column("aid_2", Integer, ForeignKey("a.id")), ) Table("atob", metadata, Column("aid", Integer), Column("bid", Integer)) Table( "atob_ambiguous", metadata, Column("aid1", Integer, ForeignKey("a.id")), Column("bid1", Integer, ForeignKey("b.id")), Column("aid2", Integer, ForeignKey("a.id")), Column("bid2", Integer, ForeignKey("b.id")), ) @classmethod def setup_classes(cls): class A(cls.Basic): pass class B(cls.Basic): pass def test_ambiguous_fks_o2m(self): A, B = self.classes.A, self.classes.B a, b = self.tables.a, self.tables.b mapper(A, a, properties={"bs": relationship(B)}) mapper(B, b) self._assert_raises_ambig_join(configure_mappers, "A.bs", None) def test_with_fks_o2m(self): A, B = self.classes.A, self.classes.B a, b = self.tables.a, self.tables.b mapper( A, a, properties={"bs": relationship(B, foreign_keys=b.c.aid_1)} ) mapper(B, b) sa.orm.configure_mappers() assert A.bs.property.primaryjoin.compare(a.c.id == b.c.aid_1) eq_(A.bs.property._calculated_foreign_keys, set([b.c.aid_1])) def test_with_pj_o2m(self): A, B = self.classes.A, self.classes.B a, b = self.tables.a, self.tables.b mapper( A, a, properties={ "bs": relationship(B, primaryjoin=a.c.id == b.c.aid_1) }, ) mapper(B, b) sa.orm.configure_mappers() assert A.bs.property.primaryjoin.compare(a.c.id == b.c.aid_1) eq_(A.bs.property._calculated_foreign_keys, set([b.c.aid_1])) def test_with_annotated_pj_o2m(self): A, B = self.classes.A, self.classes.B a, b = self.tables.a, self.tables.b mapper( A, a, properties={ "bs": relationship(B, primaryjoin=a.c.id == foreign(b.c.aid_1)) }, ) mapper(B, b) sa.orm.configure_mappers() assert A.bs.property.primaryjoin.compare(a.c.id == b.c.aid_1) eq_(A.bs.property._calculated_foreign_keys, set([b.c.aid_1])) def test_no_fks_m2m(self): A, B = self.classes.A, self.classes.B a, b, a_to_b = self.tables.a, self.tables.b, self.tables.atob mapper(A, a, properties={"bs": relationship(B, secondary=a_to_b)}) mapper(B, b) self._assert_raises_no_join(sa.orm.configure_mappers, "A.bs", a_to_b) def test_ambiguous_fks_m2m(self): A, B = self.classes.A, self.classes.B a, b, a_to_b = self.tables.a, self.tables.b, self.tables.atob_ambiguous mapper(A, a, properties={"bs": relationship(B, secondary=a_to_b)}) mapper(B, b) self._assert_raises_ambig_join( configure_mappers, "A.bs", "atob_ambiguous" ) def test_with_fks_m2m(self): A, B = self.classes.A, self.classes.B a, b, a_to_b = self.tables.a, self.tables.b, self.tables.atob_ambiguous mapper( A, a, properties={ "bs": relationship( B, secondary=a_to_b, foreign_keys=[a_to_b.c.aid1, a_to_b.c.bid1], ) }, ) mapper(B, b) sa.orm.configure_mappers() class SecondaryNestedJoinTest( fixtures.MappedTest, AssertsCompiledSQL, testing.AssertsExecutionResults ): """test support for a relationship where the 'secondary' table is a compound join(). join() and joinedload() should use a "flat" alias, lazyloading needs to ensure the join renders. """ run_setup_mappers = "once" run_inserts = "once" run_deletes = None @classmethod def define_tables(cls, metadata): Table( "a", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), Column("b_id", ForeignKey("b.id")), ) Table( "b", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), Column("d_id", ForeignKey("d.id")), ) Table( "c", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), Column("a_id", ForeignKey("a.id")), Column("d_id", ForeignKey("d.id")), ) Table( "d", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), ) @classmethod def setup_classes(cls): class A(cls.Comparable): pass class B(cls.Comparable): pass class C(cls.Comparable): pass class D(cls.Comparable): pass @classmethod def setup_mappers(cls): A, B, C, D = cls.classes.A, cls.classes.B, cls.classes.C, cls.classes.D a, b, c, d = cls.tables.a, cls.tables.b, cls.tables.c, cls.tables.d j = sa.join(b, d, b.c.d_id == d.c.id).join(c, c.c.d_id == d.c.id) # j = join(b, d, b.c.d_id == d.c.id).join(c, c.c.d_id == d.c.id) \ # .alias() mapper( A, a, properties={ "b": relationship(B), "d": relationship( D, secondary=j, primaryjoin=and_(a.c.b_id == b.c.id, a.c.id == c.c.a_id), secondaryjoin=d.c.id == b.c.d_id, # primaryjoin=and_(a.c.b_id == j.c.b_id, a.c.id == j.c.c_a_id), # secondaryjoin=d.c.id == j.c.b_d_id, uselist=False, viewonly=True, ), }, ) mapper(B, b, properties={"d": relationship(D)}) mapper(C, c, properties={"a": relationship(A), "d": relationship(D)}) mapper(D, d) @classmethod def insert_data(cls): A, B, C, D = cls.classes.A, cls.classes.B, cls.classes.C, cls.classes.D sess = Session() a1, a2, a3, a4 = A(name="a1"), A(name="a2"), A(name="a3"), A(name="a4") b1, b2, b3, b4 = B(name="b1"), B(name="b2"), B(name="b3"), B(name="b4") c1, c2, c3, c4 = C(name="c1"), C(name="c2"), C(name="c3"), C(name="c4") d1, d2 = D(name="d1"), D(name="d2") a1.b = b1 a2.b = b2 a3.b = b3 a4.b = b4 c1.a = a1 c2.a = a2 c3.a = a2 c4.a = a4 c1.d = d1 c2.d = d2 c3.d = d1 c4.d = d2 b1.d = d1 b2.d = d1 b3.d = d2 b4.d = d2 sess.add_all([a1, a2, a3, a4, b1, b2, b3, b4, c1, c2, c4, c4, d1, d2]) sess.commit() def test_render_join(self): A, D = self.classes.A, self.classes.D sess = Session() self.assert_compile( sess.query(A).join(A.d), "SELECT a.id AS a_id, a.name AS a_name, a.b_id AS a_b_id " "FROM a JOIN (b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id " "JOIN c AS c_1 ON c_1.d_id = d_1.id) ON a.b_id = b_1.id " "AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id", dialect="postgresql", ) def test_render_joinedload(self): A, D = self.classes.A, self.classes.D sess = Session() self.assert_compile( sess.query(A).options(joinedload(A.d)), "SELECT a.id AS a_id, a.name AS a_name, a.b_id AS a_b_id, " "d_1.id AS d_1_id, d_1.name AS d_1_name FROM a LEFT OUTER JOIN " "(b AS b_1 JOIN d AS d_2 ON b_1.d_id = d_2.id JOIN c AS c_1 " "ON c_1.d_id = d_2.id JOIN d AS d_1 ON d_1.id = b_1.d_id) " "ON a.b_id = b_1.id AND a.id = c_1.a_id", dialect="postgresql", ) def test_render_lazyload(self): from sqlalchemy.testing.assertsql import CompiledSQL A, D = self.classes.A, self.classes.D sess = Session() a1 = sess.query(A).filter(A.name == "a1").first() def go(): a1.d # here, the "lazy" strategy has to ensure the "secondary" # table is part of the "select_from()", since it's a join(). # referring to just the columns wont actually render all those # join conditions. self.assert_sql_execution( testing.db, go, CompiledSQL( "SELECT d.id AS d_id, d.name AS d_name FROM b " "JOIN d ON b.d_id = d.id JOIN c ON c.d_id = d.id " "WHERE :param_1 = b.id AND :param_2 = c.a_id " "AND d.id = b.d_id", {"param_1": a1.id, "param_2": a1.id}, ), ) mapping = {"a1": "d1", "a2": None, "a3": None, "a4": "d2"} def test_join(self): A, D = self.classes.A, self.classes.D sess = Session() for a, d in sess.query(A, D).outerjoin(A.d): eq_(self.mapping[a.name], d.name if d is not None else None) def test_joinedload(self): A, D = self.classes.A, self.classes.D sess = Session() for a in sess.query(A).options(joinedload(A.d)): d = a.d eq_(self.mapping[a.name], d.name if d is not None else None) def test_lazyload(self): A, D = self.classes.A, self.classes.D sess = Session() for a in sess.query(A): d = a.d eq_(self.mapping[a.name], d.name if d is not None else None) class InvalidRelationshipEscalationTest( _RelationshipErrors, fixtures.MappedTest ): @classmethod def define_tables(cls, metadata): Table( "foos", metadata, Column("id", Integer, primary_key=True), Column("fid", Integer), ) Table( "bars", metadata, Column("id", Integer, primary_key=True), Column("fid", Integer), ) Table( "foos_with_fks", metadata, Column("id", Integer, primary_key=True), Column("fid", Integer, ForeignKey("foos_with_fks.id")), ) Table( "bars_with_fks", metadata, Column("id", Integer, primary_key=True), Column("fid", Integer, ForeignKey("foos_with_fks.id")), ) @classmethod def setup_classes(cls): class Foo(cls.Basic): pass class Bar(cls.Basic): pass def test_no_join(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper(Foo, foos, properties={"bars": relationship(Bar)}) mapper(Bar, bars) self._assert_raises_no_join(sa.orm.configure_mappers, "Foo.bars", None) def test_no_join_self_ref(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper(Foo, foos, properties={"foos": relationship(Foo)}) mapper(Bar, bars) self._assert_raises_no_join(configure_mappers, "Foo.foos", None) def test_no_equated(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship(Bar, primaryjoin=foos.c.id > bars.c.fid) }, ) mapper(Bar, bars) self._assert_raises_no_relevant_fks( configure_mappers, "foos.id > bars.fid", "Foo.bars", "primary" ) def test_no_equated_fks(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=foos.c.id > bars.c.fid, foreign_keys=bars.c.fid, ) }, ) mapper(Bar, bars) self._assert_raises_no_equality( sa.orm.configure_mappers, "foos.id > bars.fid", "Foo.bars", "primary", ) def test_no_equated_wo_fks_works_on_relaxed(self): foos_with_fks, Foo, Bar, bars_with_fks, foos = ( self.tables.foos_with_fks, self.classes.Foo, self.classes.Bar, self.tables.bars_with_fks, self.tables.foos, ) # very unique - the join between parent/child # has no fks, but there is an fk join between two other # tables in the join condition, for those users that try creating # these big-long-string-of-joining-many-tables primaryjoins. # in this case we don't get eq_pairs, but we hit the # "works if viewonly" rule. so here we add another clause regarding # "try foreign keys". mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=and_( bars_with_fks.c.fid == foos_with_fks.c.id, foos_with_fks.c.id == foos.c.id, ), ) }, ) mapper(Bar, bars_with_fks) self._assert_raises_no_equality( sa.orm.configure_mappers, "bars_with_fks.fid = foos_with_fks.id " "AND foos_with_fks.id = foos.id", "Foo.bars", "primary", ) def test_ambiguous_fks(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=foos.c.id == bars.c.fid, foreign_keys=[foos.c.id, bars.c.fid], ) }, ) mapper(Bar, bars) self._assert_raises_ambiguous_direction( sa.orm.configure_mappers, "Foo.bars" ) def test_ambiguous_remoteside_o2m(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=foos.c.id == bars.c.fid, foreign_keys=[bars.c.fid], remote_side=[foos.c.id, bars.c.fid], viewonly=True, ) }, ) mapper(Bar, bars) self._assert_raises_no_local_remote(configure_mappers, "Foo.bars") def test_ambiguous_remoteside_m2o(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=foos.c.id == bars.c.fid, foreign_keys=[foos.c.id], remote_side=[foos.c.id, bars.c.fid], viewonly=True, ) }, ) mapper(Bar, bars) self._assert_raises_no_local_remote(configure_mappers, "Foo.bars") def test_no_equated_self_ref_no_fks(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "foos": relationship(Foo, primaryjoin=foos.c.id > foos.c.fid) }, ) mapper(Bar, bars) self._assert_raises_no_relevant_fks( configure_mappers, "foos.id > foos.fid", "Foo.foos", "primary" ) def test_no_equated_self_ref_no_equality(self): bars, Foo, Bar, foos = ( self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "foos": relationship( Foo, primaryjoin=foos.c.id > foos.c.fid, foreign_keys=[foos.c.fid], ) }, ) mapper(Bar, bars) self._assert_raises_no_equality( configure_mappers, "foos.id > foos.fid", "Foo.foos", "primary" ) def test_no_equated_viewonly(self): bars, Bar, bars_with_fks, foos_with_fks, Foo, foos = ( self.tables.bars, self.classes.Bar, self.tables.bars_with_fks, self.tables.foos_with_fks, self.classes.Foo, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, primaryjoin=foos.c.id > bars.c.fid, viewonly=True ) }, ) mapper(Bar, bars) self._assert_raises_no_relevant_fks( sa.orm.configure_mappers, "foos.id > bars.fid", "Foo.bars", "primary", ) sa.orm.clear_mappers() mapper( Foo, foos_with_fks, properties={ "bars": relationship( Bar, primaryjoin=foos_with_fks.c.id > bars_with_fks.c.fid, viewonly=True, ) }, ) mapper(Bar, bars_with_fks) sa.orm.configure_mappers() def test_no_equated_self_ref_viewonly(self): bars, Bar, bars_with_fks, foos_with_fks, Foo, foos = ( self.tables.bars, self.classes.Bar, self.tables.bars_with_fks, self.tables.foos_with_fks, self.classes.Foo, self.tables.foos, ) mapper( Foo, foos, properties={ "foos": relationship( Foo, primaryjoin=foos.c.id > foos.c.fid, viewonly=True ) }, ) mapper(Bar, bars) self._assert_raises_no_relevant_fks( sa.orm.configure_mappers, "foos.id > foos.fid", "Foo.foos", "primary", ) sa.orm.clear_mappers() mapper( Foo, foos_with_fks, properties={ "foos": relationship( Foo, primaryjoin=foos_with_fks.c.id > foos_with_fks.c.fid, viewonly=True, ) }, ) mapper(Bar, bars_with_fks) sa.orm.configure_mappers() def test_no_equated_self_ref_viewonly_fks(self): Foo, foos = self.classes.Foo, self.tables.foos mapper( Foo, foos, properties={ "foos": relationship( Foo, primaryjoin=foos.c.id > foos.c.fid, viewonly=True, foreign_keys=[foos.c.fid], ) }, ) sa.orm.configure_mappers() eq_(Foo.foos.property.local_remote_pairs, [(foos.c.id, foos.c.fid)]) def test_equated(self): bars, Bar, bars_with_fks, foos_with_fks, Foo, foos = ( self.tables.bars, self.classes.Bar, self.tables.bars_with_fks, self.tables.foos_with_fks, self.classes.Foo, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship(Bar, primaryjoin=foos.c.id == bars.c.fid) }, ) mapper(Bar, bars) self._assert_raises_no_relevant_fks( configure_mappers, "foos.id = bars.fid", "Foo.bars", "primary" ) sa.orm.clear_mappers() mapper( Foo, foos_with_fks, properties={ "bars": relationship( Bar, primaryjoin=foos_with_fks.c.id == bars_with_fks.c.fid ) }, ) mapper(Bar, bars_with_fks) sa.orm.configure_mappers() def test_equated_self_ref(self): Foo, foos = self.classes.Foo, self.tables.foos mapper( Foo, foos, properties={ "foos": relationship(Foo, primaryjoin=foos.c.id == foos.c.fid) }, ) self._assert_raises_no_relevant_fks( configure_mappers, "foos.id = foos.fid", "Foo.foos", "primary" ) def test_equated_self_ref_wrong_fks(self): bars, Foo, foos = ( self.tables.bars, self.classes.Foo, self.tables.foos, ) mapper( Foo, foos, properties={ "foos": relationship( Foo, primaryjoin=foos.c.id == foos.c.fid, foreign_keys=[bars.c.id], ) }, ) self._assert_raises_no_relevant_fks( configure_mappers, "foos.id = foos.fid", "Foo.foos", "primary" ) class InvalidRelationshipEscalationTestM2M( _RelationshipErrors, fixtures.MappedTest ): @classmethod def define_tables(cls, metadata): Table("foos", metadata, Column("id", Integer, primary_key=True)) Table( "foobars", metadata, Column("fid", Integer), Column("bid", Integer) ) Table("bars", metadata, Column("id", Integer, primary_key=True)) Table( "foobars_with_fks", metadata, Column("fid", Integer, ForeignKey("foos.id")), Column("bid", Integer, ForeignKey("bars.id")), ) Table( "foobars_with_many_columns", metadata, Column("fid", Integer), Column("bid", Integer), Column("fid1", Integer), Column("bid1", Integer), Column("fid2", Integer), Column("bid2", Integer), ) @classmethod def setup_classes(cls): class Foo(cls.Basic): pass class Bar(cls.Basic): pass def test_no_join(self): foobars, bars, Foo, Bar, foos = ( self.tables.foobars, self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={"bars": relationship(Bar, secondary=foobars)}, ) mapper(Bar, bars) self._assert_raises_no_join(configure_mappers, "Foo.bars", "foobars") def test_no_secondaryjoin(self): foobars, bars, Foo, Bar, foos = ( self.tables.foobars, self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars, primaryjoin=foos.c.id > foobars.c.fid, ) }, ) mapper(Bar, bars) self._assert_raises_no_join(configure_mappers, "Foo.bars", "foobars") def test_no_fks(self): foobars_with_many_columns, bars, Bar, foobars, Foo, foos = ( self.tables.foobars_with_many_columns, self.tables.bars, self.classes.Bar, self.tables.foobars, self.classes.Foo, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars, primaryjoin=foos.c.id == foobars.c.fid, secondaryjoin=foobars.c.bid == bars.c.id, ) }, ) mapper(Bar, bars) sa.orm.configure_mappers() eq_(Foo.bars.property.synchronize_pairs, [(foos.c.id, foobars.c.fid)]) eq_( Foo.bars.property.secondary_synchronize_pairs, [(bars.c.id, foobars.c.bid)], ) sa.orm.clear_mappers() mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars_with_many_columns, primaryjoin=foos.c.id == foobars_with_many_columns.c.fid, secondaryjoin=foobars_with_many_columns.c.bid == bars.c.id, ) }, ) mapper(Bar, bars) sa.orm.configure_mappers() eq_( Foo.bars.property.synchronize_pairs, [(foos.c.id, foobars_with_many_columns.c.fid)], ) eq_( Foo.bars.property.secondary_synchronize_pairs, [(bars.c.id, foobars_with_many_columns.c.bid)], ) def test_local_col_setup(self): foobars_with_fks, bars, Bar, Foo, foos = ( self.tables.foobars_with_fks, self.tables.bars, self.classes.Bar, self.classes.Foo, self.tables.foos, ) # ensure m2m backref is set up with correct annotations # [ticket:2578] mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars_with_fks, backref="foos" ) }, ) mapper(Bar, bars) sa.orm.configure_mappers() eq_(Foo.bars.property._join_condition.local_columns, set([foos.c.id])) eq_(Bar.foos.property._join_condition.local_columns, set([bars.c.id])) def test_bad_primaryjoin(self): foobars_with_fks, bars, Bar, foobars, Foo, foos = ( self.tables.foobars_with_fks, self.tables.bars, self.classes.Bar, self.tables.foobars, self.classes.Foo, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars, primaryjoin=foos.c.id > foobars.c.fid, secondaryjoin=foobars.c.bid <= bars.c.id, ) }, ) mapper(Bar, bars) self._assert_raises_no_equality( configure_mappers, "foos.id > foobars.fid", "Foo.bars", "primary" ) sa.orm.clear_mappers() mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars_with_fks, primaryjoin=foos.c.id > foobars_with_fks.c.fid, secondaryjoin=foobars_with_fks.c.bid <= bars.c.id, ) }, ) mapper(Bar, bars) self._assert_raises_no_equality( configure_mappers, "foos.id > foobars_with_fks.fid", "Foo.bars", "primary", ) sa.orm.clear_mappers() mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars_with_fks, primaryjoin=foos.c.id > foobars_with_fks.c.fid, secondaryjoin=foobars_with_fks.c.bid <= bars.c.id, viewonly=True, ) }, ) mapper(Bar, bars) sa.orm.configure_mappers() def test_bad_secondaryjoin(self): foobars, bars, Foo, Bar, foos = ( self.tables.foobars, self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars, primaryjoin=foos.c.id == foobars.c.fid, secondaryjoin=foobars.c.bid <= bars.c.id, foreign_keys=[foobars.c.fid], ) }, ) mapper(Bar, bars) self._assert_raises_no_relevant_fks( configure_mappers, "foobars.bid <= bars.id", "Foo.bars", "secondary", ) def test_no_equated_secondaryjoin(self): foobars, bars, Foo, Bar, foos = ( self.tables.foobars, self.tables.bars, self.classes.Foo, self.classes.Bar, self.tables.foos, ) mapper( Foo, foos, properties={ "bars": relationship( Bar, secondary=foobars, primaryjoin=foos.c.id == foobars.c.fid, secondaryjoin=foobars.c.bid <= bars.c.id, foreign_keys=[foobars.c.fid, foobars.c.bid], ) }, ) mapper(Bar, bars) self._assert_raises_no_equality( configure_mappers, "foobars.bid <= bars.id", "Foo.bars", "secondary", ) class ActiveHistoryFlagTest(_fixtures.FixtureTest): run_inserts = None run_deletes = None def _test_attribute(self, obj, attrname, newvalue): sess = Session() sess.add(obj) oldvalue = getattr(obj, attrname) sess.commit() # expired assert attrname not in obj.__dict__ setattr(obj, attrname, newvalue) eq_( attributes.get_history(obj, attrname), ([newvalue], (), [oldvalue]) ) def test_column_property_flag(self): User, users = self.classes.User, self.tables.users mapper( User, users, properties={ "name": column_property(users.c.name, active_history=True) }, ) u1 = User(name="jack") self._test_attribute(u1, "name", "ed") def test_relationship_property_flag(self): Address, addresses, users, User = ( self.classes.Address, self.tables.addresses, self.tables.users, self.classes.User, ) mapper( Address, addresses, properties={"user": relationship(User, active_history=True)}, ) mapper(User, users) u1 = User(name="jack") u2 = User(name="ed") a1 = Address(email_address="a1", user=u1) self._test_attribute(a1, "user", u2) def test_composite_property_flag(self): Order, orders = self.classes.Order, self.tables.orders class MyComposite(object): def __init__(self, description, isopen): self.description = description self.isopen = isopen def __composite_values__(self): return [self.description, self.isopen] def __eq__(self, other): return ( isinstance(other, MyComposite) and other.description == self.description ) mapper( Order, orders, properties={ "composite": composite( MyComposite, orders.c.description, orders.c.isopen, active_history=True, ) }, ) o1 = Order(composite=MyComposite("foo", 1)) self._test_attribute(o1, "composite", MyComposite("bar", 1)) class RelationDeprecationTest(fixtures.MappedTest): """test usage of the old 'relation' function.""" run_inserts = "once" run_deletes = None @classmethod def define_tables(cls, metadata): Table( "users_table", metadata, Column("id", Integer, primary_key=True), Column("name", String(64)), ) Table( "addresses_table", metadata, Column("id", Integer, primary_key=True), Column("user_id", Integer, ForeignKey("users_table.id")), Column("email_address", String(128)), Column("purpose", String(16)), Column("bounces", Integer, default=0), ) @classmethod def setup_classes(cls): class User(cls.Basic): pass class Address(cls.Basic): pass @classmethod def fixtures(cls): return dict( users_table=( ("id", "name"), (1, "jack"), (2, "ed"), (3, "fred"), (4, "chuck"), ), addresses_table=( ("id", "user_id", "email_address", "purpose", "bounces"), (1, 1, "jack@jack.home", "Personal", 0), (2, 1, "jack@jack.bizz", "Work", 1), (3, 2, "ed@foo.bar", "Personal", 0), (4, 3, "fred@the.fred", "Personal", 10), ), ) def test_relation(self): addresses_table, User, users_table, Address = ( self.tables.addresses_table, self.classes.User, self.tables.users_table, self.classes.Address, ) mapper( User, users_table, properties=dict(addresses=relation(Address, backref="user")), ) mapper(Address, addresses_table) session = create_session() session.query(User).filter( User.addresses.any(Address.email_address == "ed@foo.bar") ).one()