diff options
Diffstat (limited to 'test/orm/inheritance')
| -rw-r--r-- | test/orm/inheritance/_poly_fixtures.py | 10 | ||||
| -rw-r--r-- | test/orm/inheritance/test_abc_inheritance.py | 505 | ||||
| -rw-r--r-- | test/orm/inheritance/test_abc_polymorphic.py | 2 | ||||
| -rw-r--r-- | test/orm/inheritance/test_assorted_poly.py | 152 | ||||
| -rw-r--r-- | test/orm/inheritance/test_basic.py | 170 | ||||
| -rw-r--r-- | test/orm/inheritance/test_concrete.py | 110 | ||||
| -rw-r--r-- | test/orm/inheritance/test_magazine.py | 6 | ||||
| -rw-r--r-- | test/orm/inheritance/test_manytomany.py | 4 | ||||
| -rw-r--r-- | test/orm/inheritance/test_poly_linked_list.py | 2 | ||||
| -rw-r--r-- | test/orm/inheritance/test_poly_persistence.py | 20 | ||||
| -rw-r--r-- | test/orm/inheritance/test_polymorphic_rel.py | 92 | ||||
| -rw-r--r-- | test/orm/inheritance/test_productspec.py | 2 | ||||
| -rw-r--r-- | test/orm/inheritance/test_relationship.py | 143 | ||||
| -rw-r--r-- | test/orm/inheritance/test_single.py | 68 |
14 files changed, 703 insertions, 583 deletions
diff --git a/test/orm/inheritance/_poly_fixtures.py b/test/orm/inheritance/_poly_fixtures.py index 77564bcdb..4253b4bee 100644 --- a/test/orm/inheritance/_poly_fixtures.py +++ b/test/orm/inheritance/_poly_fixtures.py @@ -2,9 +2,9 @@ from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy import util -from sqlalchemy.orm import create_session from sqlalchemy.orm import polymorphic_union from sqlalchemy.orm import relationship +from sqlalchemy.orm import sessionmaker from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import config @@ -221,11 +221,9 @@ class _PolymorphicFixtureBase(fixtures.MappedTest, AssertsCompiledSQL): cls.c2 = c2 = Company(name="Elbonia, Inc.") c2.employees = [e3] - sess = create_session(connection) - sess.add(c1) - sess.add(c2) - sess.flush() - sess.expunge_all() + with sessionmaker(connection, expire_on_commit=False).begin() as sess: + sess.add(c1) + sess.add(c2) cls.all_employees = [e1, e2, b1, m1, e3] cls.c1_employees = [e1, e2, b1, m1] diff --git a/test/orm/inheritance/test_abc_inheritance.py b/test/orm/inheritance/test_abc_inheritance.py index b5bff78d2..60c488be3 100644 --- a/test/orm/inheritance/test_abc_inheritance.py +++ b/test/orm/inheritance/test_abc_inheritance.py @@ -1,303 +1,270 @@ from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy import String +from sqlalchemy import testing from sqlalchemy.orm import class_mapper -from sqlalchemy.orm import create_session from sqlalchemy.orm import mapper from sqlalchemy.orm import polymorphic_union from sqlalchemy.orm import relationship from sqlalchemy.orm.interfaces import MANYTOONE from sqlalchemy.orm.interfaces import ONETOMANY from sqlalchemy.testing import fixtures +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table -def produce_test(parent, child, direction): - """produce a testcase for A->B->C inheritance with a self-referential - relationship between two of the classes, using either one-to-many or - many-to-one. +def _combinations(): + for parent in ["a", "b", "c"]: + for child in ["a", "b", "c"]: + for direction in [ONETOMANY, MANYTOONE]: + + name = "Test%sTo%s%s" % ( + parent, + child, + (direction is ONETOMANY and "O2M" or "M2O"), + ) + yield (name, parent, child, direction) - the old "no discriminator column" pattern is used. - """ +@testing.combinations( + *list(_combinations()), argnames="name,parent,child,direction", id_="saaa" +) +class ABCTest(fixtures.MappedTest): + @classmethod + def define_tables(cls, metadata): + parent, child, direction = cls.parent, cls.child, cls.direction - class ABCTest(fixtures.MappedTest): - @classmethod - def define_tables(cls, metadata): - global ta, tb, tc - ta = ["a", metadata] + ta = ["a", metadata] + ta.append( + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True, + ) + ), + ta.append(Column("a_data", String(30))) + if "a" == parent and direction == MANYTOONE: ta.append( Column( - "id", + "child_id", Integer, - primary_key=True, - test_needs_autoincrement=True, - ) - ), - ta.append(Column("a_data", String(30))) - if "a" == parent and direction == MANYTOONE: - ta.append( - Column( - "child_id", - Integer, - ForeignKey( - "%s.id" % child, use_alter=True, name="foo" - ), - ) + ForeignKey("%s.id" % child, use_alter=True, name="foo"), ) - elif "a" == child and direction == ONETOMANY: - ta.append( - Column( - "parent_id", - Integer, - ForeignKey( - "%s.id" % parent, use_alter=True, name="foo" - ), - ) + ) + elif "a" == child and direction == ONETOMANY: + ta.append( + Column( + "parent_id", + Integer, + ForeignKey("%s.id" % parent, use_alter=True, name="foo"), ) - ta = Table(*ta) - - tb = ["b", metadata] - tb.append( - Column("id", Integer, ForeignKey("a.id"), primary_key=True) ) + ta = Table(*ta) - tb.append(Column("b_data", String(30))) - - if "b" == parent and direction == MANYTOONE: - tb.append( - Column( - "child_id", - Integer, - ForeignKey( - "%s.id" % child, use_alter=True, name="foo" - ), - ) - ) - elif "b" == child and direction == ONETOMANY: - tb.append( - Column( - "parent_id", - Integer, - ForeignKey( - "%s.id" % parent, use_alter=True, name="foo" - ), - ) - ) - tb = Table(*tb) + tb = ["b", metadata] + tb.append(Column("id", Integer, ForeignKey("a.id"), primary_key=True)) - tc = ["c", metadata] - tc.append( - Column("id", Integer, ForeignKey("b.id"), primary_key=True) - ) + tb.append(Column("b_data", String(30))) - tc.append(Column("c_data", String(30))) - - if "c" == parent and direction == MANYTOONE: - tc.append( - Column( - "child_id", - Integer, - ForeignKey( - "%s.id" % child, use_alter=True, name="foo" - ), - ) - ) - elif "c" == child and direction == ONETOMANY: - tc.append( - Column( - "parent_id", - Integer, - ForeignKey( - "%s.id" % parent, use_alter=True, name="foo" - ), - ) + if "b" == parent and direction == MANYTOONE: + tb.append( + Column( + "child_id", + Integer, + ForeignKey("%s.id" % child, use_alter=True, name="foo"), ) - tc = Table(*tc) - - def teardown(self): - if direction == MANYTOONE: - parent_table = {"a": ta, "b": tb, "c": tc}[parent] - parent_table.update( - values={parent_table.c.child_id: None} - ).execute() - elif direction == ONETOMANY: - child_table = {"a": ta, "b": tb, "c": tc}[child] - child_table.update( - values={child_table.c.parent_id: None} - ).execute() - super(ABCTest, self).teardown() - - def test_roundtrip(self): - parent_table = {"a": ta, "b": tb, "c": tc}[parent] - child_table = {"a": ta, "b": tb, "c": tc}[child] - - remote_side = None - - if direction == MANYTOONE: - foreign_keys = [parent_table.c.child_id] - elif direction == ONETOMANY: - foreign_keys = [child_table.c.parent_id] - - atob = ta.c.id == tb.c.id - btoc = tc.c.id == tb.c.id - - if direction == ONETOMANY: - relationshipjoin = parent_table.c.id == child_table.c.parent_id - elif direction == MANYTOONE: - relationshipjoin = parent_table.c.child_id == child_table.c.id - if parent is child: - remote_side = [child_table.c.id] - - abcjoin = polymorphic_union( - { - "a": ta.select( - tb.c.id == None, # noqa - from_obj=[ta.outerjoin(tb, onclause=atob)], - ).subquery(), - "b": ta.join(tb, onclause=atob) - .outerjoin(tc, onclause=btoc) - .select(tc.c.id == None) - .reduce_columns() - .subquery(), # noqa - "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob), - }, - "type", - "abcjoin", ) - - bcjoin = polymorphic_union( - { - "b": ta.join(tb, onclause=atob) - .outerjoin(tc, onclause=btoc) - .select(tc.c.id == None) - .reduce_columns() - .subquery(), # noqa - "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob), - }, - "type", - "bcjoin", + elif "b" == child and direction == ONETOMANY: + tb.append( + Column( + "parent_id", + Integer, + ForeignKey("%s.id" % parent, use_alter=True, name="foo"), + ) ) + tb = Table(*tb) - class A(object): - def __init__(self, name): - self.a_data = name + tc = ["c", metadata] + tc.append(Column("id", Integer, ForeignKey("b.id"), primary_key=True)) - class B(A): - pass + tc.append(Column("c_data", String(30))) - class C(B): - pass - - mapper( - A, - ta, - polymorphic_on=abcjoin.c.type, - with_polymorphic=("*", abcjoin), - polymorphic_identity="a", - ) - mapper( - B, - tb, - polymorphic_on=bcjoin.c.type, - with_polymorphic=("*", bcjoin), - polymorphic_identity="b", - inherits=A, - inherit_condition=atob, - ) - mapper( - C, - tc, - polymorphic_identity="c", - with_polymorphic=("*", tc.join(tb, btoc).join(ta, atob)), - inherits=B, - inherit_condition=btoc, + if "c" == parent and direction == MANYTOONE: + tc.append( + Column( + "child_id", + Integer, + ForeignKey("%s.id" % child, use_alter=True, name="foo"), + ) ) - - parent_mapper = class_mapper({ta: A, tb: B, tc: C}[parent_table]) - child_mapper = class_mapper({ta: A, tb: B, tc: C}[child_table]) - - parent_class = parent_mapper.class_ - child_class = child_mapper.class_ - - parent_mapper.add_property( - "collection", - relationship( - child_mapper, - primaryjoin=relationshipjoin, - foreign_keys=foreign_keys, - order_by=child_mapper.c.id, - remote_side=remote_side, - uselist=True, - ), + elif "c" == child and direction == ONETOMANY: + tc.append( + Column( + "parent_id", + Integer, + ForeignKey("%s.id" % parent, use_alter=True, name="foo"), + ) ) - - sess = create_session() - - parent_obj = parent_class("parent1") - child_obj = child_class("child1") - somea = A("somea") - someb = B("someb") - somec = C("somec") - - # print "APPENDING", parent.__class__.__name__ , "TO", - # child.__class__.__name__ - - sess.add(parent_obj) - parent_obj.collection.append(child_obj) - if direction == ONETOMANY: - child2 = child_class("child2") - parent_obj.collection.append(child2) - sess.add(child2) - elif direction == MANYTOONE: - parent2 = parent_class("parent2") - parent2.collection.append(child_obj) - sess.add(parent2) - sess.add(somea) - sess.add(someb) - sess.add(somec) - sess.flush() - sess.expunge_all() - - # assert result via direct get() of parent object - result = sess.query(parent_class).get(parent_obj.id) - assert result.id == parent_obj.id - assert result.collection[0].id == child_obj.id - if direction == ONETOMANY: - assert result.collection[1].id == child2.id - elif direction == MANYTOONE: - result2 = sess.query(parent_class).get(parent2.id) - assert result2.id == parent2.id - assert result2.collection[0].id == child_obj.id - - sess.expunge_all() - - # assert result via polymorphic load of parent object - result = sess.query(A).filter_by(id=parent_obj.id).one() - assert result.id == parent_obj.id - assert result.collection[0].id == child_obj.id - if direction == ONETOMANY: - assert result.collection[1].id == child2.id - elif direction == MANYTOONE: - result2 = sess.query(A).filter_by(id=parent2.id).one() - assert result2.id == parent2.id - assert result2.collection[0].id == child_obj.id - - ABCTest.__name__ = "Test%sTo%s%s" % ( - parent, - child, - (direction is ONETOMANY and "O2M" or "M2O"), - ) - return ABCTest - - -# test all combinations of polymorphic a/b/c related to another of a/b/c -for parent in ["a", "b", "c"]: - for child in ["a", "b", "c"]: - for direction in [ONETOMANY, MANYTOONE]: - testclass = produce_test(parent, child, direction) - exec("%s = testclass" % testclass.__name__) - del testclass - -del produce_test + tc = Table(*tc) + + @classmethod + def setup_mappers(cls): + parent, child, direction = cls.parent, cls.child, cls.direction + ta, tb, tc = cls.tables("a", "b", "c") + parent_table = {"a": ta, "b": tb, "c": tc}[parent] + child_table = {"a": ta, "b": tb, "c": tc}[child] + + remote_side = None + + if direction == MANYTOONE: + foreign_keys = [parent_table.c.child_id] + elif direction == ONETOMANY: + foreign_keys = [child_table.c.parent_id] + + atob = ta.c.id == tb.c.id + btoc = tc.c.id == tb.c.id + + if direction == ONETOMANY: + relationshipjoin = parent_table.c.id == child_table.c.parent_id + elif direction == MANYTOONE: + relationshipjoin = parent_table.c.child_id == child_table.c.id + if parent is child: + remote_side = [child_table.c.id] + + abcjoin = polymorphic_union( + { + "a": ta.select() + .where(tb.c.id == None) # noqa + .select_from(ta.outerjoin(tb, onclause=atob)) + .subquery(), + "b": ta.join(tb, onclause=atob) + .outerjoin(tc, onclause=btoc) + .select() + .where(tc.c.id == None) + .reduce_columns() + .subquery(), # noqa + "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob), + }, + "type", + "abcjoin", + ) + + bcjoin = polymorphic_union( + { + "b": ta.join(tb, onclause=atob) + .outerjoin(tc, onclause=btoc) + .select() + .where(tc.c.id == None) + .reduce_columns() + .subquery(), # noqa + "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob), + }, + "type", + "bcjoin", + ) + + class A(cls.Comparable): + def __init__(self, name): + self.a_data = name + + class B(A): + pass + + class C(B): + pass + + mapper( + A, + ta, + polymorphic_on=abcjoin.c.type, + with_polymorphic=("*", abcjoin), + polymorphic_identity="a", + ) + mapper( + B, + tb, + polymorphic_on=bcjoin.c.type, + with_polymorphic=("*", bcjoin), + polymorphic_identity="b", + inherits=A, + inherit_condition=atob, + ) + mapper( + C, + tc, + polymorphic_identity="c", + with_polymorphic=("*", tc.join(tb, btoc).join(ta, atob)), + inherits=B, + inherit_condition=btoc, + ) + + parent_mapper = class_mapper({ta: A, tb: B, tc: C}[parent_table]) + child_mapper = class_mapper({ta: A, tb: B, tc: C}[child_table]) + + parent_mapper.add_property( + "collection", + relationship( + child_mapper, + primaryjoin=relationshipjoin, + foreign_keys=foreign_keys, + order_by=child_mapper.c.id, + remote_side=remote_side, + uselist=True, + ), + ) + + def test_roundtrip(self): + parent, child, direction = self.parent, self.child, self.direction + A, B, C = self.classes("A", "B", "C") + parent_class = {"a": A, "b": B, "c": C}[parent] + child_class = {"a": A, "b": B, "c": C}[child] + + sess = create_session() + + parent_obj = parent_class("parent1") + child_obj = child_class("child1") + somea = A("somea") + someb = B("someb") + somec = C("somec") + + # print "APPENDING", parent.__class__.__name__ , "TO", + # child.__class__.__name__ + + sess.add(parent_obj) + parent_obj.collection.append(child_obj) + if direction == ONETOMANY: + child2 = child_class("child2") + parent_obj.collection.append(child2) + sess.add(child2) + elif direction == MANYTOONE: + parent2 = parent_class("parent2") + parent2.collection.append(child_obj) + sess.add(parent2) + sess.add(somea) + sess.add(someb) + sess.add(somec) + sess.commit() + sess.close() + + # assert result via direct get() of parent object + result = sess.get(parent_class, parent_obj.id) + assert result.id == parent_obj.id + assert result.collection[0].id == child_obj.id + if direction == ONETOMANY: + assert result.collection[1].id == child2.id + elif direction == MANYTOONE: + result2 = sess.get(parent_class, parent2.id) + assert result2.id == parent2.id + assert result2.collection[0].id == child_obj.id + + sess.expunge_all() + + # assert result via polymorphic load of parent object + result = sess.query(A).filter_by(id=parent_obj.id).one() + assert result.id == parent_obj.id + assert result.collection[0].id == child_obj.id + if direction == ONETOMANY: + assert result.collection[1].id == child2.id + elif direction == MANYTOONE: + result2 = sess.query(A).filter_by(id=parent2.id).one() + assert result2.id == parent2.id + assert result2.collection[0].id == child_obj.id diff --git a/test/orm/inheritance/test_abc_polymorphic.py b/test/orm/inheritance/test_abc_polymorphic.py index cf06c9e26..0d28ef342 100644 --- a/test/orm/inheritance/test_abc_polymorphic.py +++ b/test/orm/inheritance/test_abc_polymorphic.py @@ -2,10 +2,10 @@ from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy import testing -from sqlalchemy.orm import create_session from sqlalchemy.orm import mapper from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table diff --git a/test/orm/inheritance/test_assorted_poly.py b/test/orm/inheritance/test_assorted_poly.py index fe6880591..2767607cb 100644 --- a/test/orm/inheritance/test_assorted_poly.py +++ b/test/orm/inheritance/test_assorted_poly.py @@ -16,18 +16,19 @@ from sqlalchemy import util from sqlalchemy.orm import class_mapper from sqlalchemy.orm import column_property from sqlalchemy.orm import contains_eager -from sqlalchemy.orm import create_session from sqlalchemy.orm import join from sqlalchemy.orm import joinedload from sqlalchemy.orm import mapper from sqlalchemy.orm import polymorphic_union from sqlalchemy.orm import relationship from sqlalchemy.orm import Session +from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import with_polymorphic from sqlalchemy.orm.interfaces import MANYTOONE from sqlalchemy.testing import AssertsExecutionResults from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -115,8 +116,8 @@ class RelationshipTest1(fixtures.MappedTest): session.flush() session.expunge_all() - p = session.query(Person).get(p.person_id) - m = session.query(Manager).get(m.person_id) + p = session.get(Person, p.person_id) + m = session.get(Manager, m.person_id) assert p.manager is m def test_descendant_refs_parent(self): @@ -147,8 +148,8 @@ class RelationshipTest1(fixtures.MappedTest): session.flush() session.expunge_all() - p = session.query(Person).get(p.person_id) - m = session.query(Manager).get(m.person_id) + p = session.get(Person, p.person_id) + m = session.get(Manager, m.person_id) assert m.employee is p @@ -215,9 +216,9 @@ class RelationshipTest2(fixtures.MappedTest): if jointype == "join1": poly_union = polymorphic_union( { - "person": people.select( - people.c.type == "person" - ).subquery(), + "person": people.select() + .where(people.c.type == "person") + .subquery(), "manager": join( people, managers, @@ -230,9 +231,9 @@ class RelationshipTest2(fixtures.MappedTest): elif jointype == "join2": poly_union = polymorphic_union( { - "person": people.select( - people.c.type == "person" - ).subquery(), + "person": people.select() + .where(people.c.type == "person") + .subquery(), "manager": managers.join( people, people.c.person_id == managers.c.person_id ), @@ -306,8 +307,8 @@ class RelationshipTest2(fixtures.MappedTest): sess.flush() sess.expunge_all() - p = sess.query(Person).get(p.person_id) - m = sess.query(Manager).get(m.person_id) + p = sess.get(Person, p.person_id) + m = sess.get(Manager, m.person_id) assert m.colleague is p if usedata: assert m.data.data == "ms data" @@ -377,9 +378,9 @@ class RelationshipTest3(fixtures.MappedTest): "manager": managers.join( people, people.c.person_id == managers.c.person_id ), - "person": people.select( - people.c.type == "person" - ).subquery(), + "person": people.select() + .where(people.c.type == "person") + .subquery(), }, None, ) @@ -391,9 +392,9 @@ class RelationshipTest3(fixtures.MappedTest): managers, people.c.person_id == managers.c.person_id, ), - "person": people.select( - people.c.type == "person" - ).subquery(), + "person": people.select() + .where(people.c.type == "person") + .subquery(), }, None, ) @@ -477,10 +478,10 @@ class RelationshipTest3(fixtures.MappedTest): sess.flush() sess.expunge_all() - p = sess.query(Person).get(p.person_id) - p2 = sess.query(Person).get(p2.person_id) - p3 = sess.query(Person).get(p3.person_id) - m = sess.query(Person).get(m.person_id) + p = sess.get(Person, p.person_id) + p2 = sess.get(Person, p2.person_id) + p3 = sess.get(Person, p3.person_id) + m = sess.get(Person, m.person_id) assert len(p.colleagues) == 1 assert p.colleagues == [p2] assert m.colleagues == [p3] @@ -635,17 +636,15 @@ class RelationshipTest4(fixtures.MappedTest): session.expunge_all() def go(): - testcar = ( - session.query(Car) - .options(joinedload("employee")) - .get(car1.car_id) + testcar = session.get( + Car, car1.car_id, options=[joinedload("employee")] ) assert str(testcar.employee) == "Engineer E4, status X" self.assert_sql_count(testing.db, go, 1) - car1 = session.query(Car).get(car1.car_id) - usingGet = session.query(person_mapper).get(car1.owner) + car1 = session.get(Car, car1.car_id) + usingGet = session.get(person_mapper, car1.owner) usingProperty = car1.employee assert str(engineer4) == "Engineer E4, status X" @@ -656,10 +655,8 @@ class RelationshipTest4(fixtures.MappedTest): # and now for the lightning round, eager ! def go(): - testcar = ( - session.query(Car) - .options(joinedload("employee")) - .get(car1.car_id) + testcar = session.get( + Car, car1.car_id, options=[joinedload("employee")], ) assert str(testcar.employee) == "Engineer E4, status X" @@ -864,8 +861,8 @@ class RelationshipTest6(fixtures.MappedTest): sess.flush() sess.expunge_all() - m = sess.query(Manager).get(m.person_id) - m2 = sess.query(Manager).get(m2.person_id) + m = sess.get(Manager, m.person_id) + m2 = sess.get(Manager, m2.person_id) assert m.colleague is m2 @@ -984,7 +981,8 @@ class RelationshipTest7(fixtures.MappedTest): car_join = polymorphic_union( { "car": cars.outerjoin(offroad_cars) - .select(offroad_cars.c.car_id == None) + .select() + .where(offroad_cars.c.car_id == None) .reduce_columns() .subquery(), "offroad": cars.join(offroad_cars), @@ -1266,42 +1264,38 @@ class GenerativeTest(fixtures.MappedTest, AssertsExecutionResults): Status, Person, Engineer, Manager, Car = cls.classes( "Status", "Person", "Engineer", "Manager", "Car" ) - session = create_session(connection) + with sessionmaker(connection).begin() as session: - active = Status(name="active") - dead = Status(name="dead") + active = Status(name="active") + dead = Status(name="dead") - session.add(active) - session.add(dead) - session.flush() - - # TODO: we haven't created assertions for all - # the data combinations created here + session.add(active) + session.add(dead) - # creating 5 managers named from M1 to M5 - # and 5 engineers named from E1 to E5 - # M4, M5, E4 and E5 are dead - for i in range(1, 5): - if i < 4: - st = active - else: - st = dead - session.add( - Manager(name="M%d" % i, category="YYYYYYYYY", status=st) - ) - session.add(Engineer(name="E%d" % i, field="X", status=st)) + # TODO: we haven't created assertions for all + # the data combinations created here - session.flush() + # creating 5 managers named from M1 to M5 + # and 5 engineers named from E1 to E5 + # M4, M5, E4 and E5 are dead + for i in range(1, 5): + if i < 4: + st = active + else: + st = dead + session.add( + Manager(name="M%d" % i, category="YYYYYYYYY", status=st) + ) + session.add(Engineer(name="E%d" % i, field="X", status=st)) - # get E4 - engineer4 = session.query(Engineer).filter_by(name="E4").one() + # get E4 + engineer4 = session.query(Engineer).filter_by(name="E4").one() - # create 2 cars for E4, one active and one dead - car1 = Car(employee=engineer4, status=active) - car2 = Car(employee=engineer4, status=dead) - session.add(car1) - session.add(car2) - session.flush() + # create 2 cars for E4, one active and one dead + car1 = Car(employee=engineer4, status=active) + car2 = Car(employee=engineer4, status=dead) + session.add(car1) + session.add(car2) def test_join_to_q_person(self): Status, Person, Engineer, Manager, Car = self.classes( @@ -1357,7 +1351,7 @@ class GenerativeTest(fixtures.MappedTest, AssertsExecutionResults): ) session = create_session() r = session.query(Person).filter( - exists([1], Car.owner == Person.person_id) + exists().where(Car.owner == Person.person_id) ) eq_( @@ -1424,9 +1418,9 @@ class MultiLevelTest(fixtures.MappedTest): .where(table_Employee.c.atype == "Engineer") .select_from(table_Employee.join(table_Engineer)) .subquery(), - "Employee": table_Employee.select( - table_Employee.c.atype == "Employee" - ).subquery(), + "Employee": table_Employee.select() + .where(table_Employee.c.atype == "Employee") + .subquery(), }, None, "pu_employee", @@ -1540,9 +1534,9 @@ class ManyToManyPolyTest(fixtures.MappedTest): item_join = polymorphic_union( { - "BaseItem": base_item_table.select( - base_item_table.c.child_name == "BaseItem" - ).subquery(), + "BaseItem": base_item_table.select() + .where(base_item_table.c.child_name == "BaseItem") + .subquery(), "Item": base_item_table.join(item_table), }, None, @@ -1610,7 +1604,7 @@ class CustomPKTest(fixtures.MappedTest): # a 2-col pk in any case but the leading select has a NULL for the # "t2id" column d = util.OrderedDict() - d["t1"] = t1.select(t1.c.type == "t1").subquery() + d["t1"] = t1.select().where(t1.c.type == "t1").subquery() d["t2"] = t1.join(t2) pjoin = polymorphic_union(d, None, "pjoin") @@ -1634,9 +1628,9 @@ class CustomPKTest(fixtures.MappedTest): # query using get(), using only one value. # this requires the select_table mapper # has the same single-col primary key. - assert sess.query(T1).get(ot1.id).id == ot1.id + assert sess.get(T1, ot1.id).id == ot1.id - ot1 = sess.query(T1).get(ot1.id) + ot1 = sess.get(T1, ot1.id) ot1.data = "hi" sess.flush() @@ -1656,7 +1650,7 @@ class CustomPKTest(fixtures.MappedTest): # a 2-col pk in any case but the leading select has a NULL for the # "t2id" column d = util.OrderedDict() - d["t1"] = t1.select(t1.c.type == "t1").subquery() + d["t1"] = t1.select().where(t1.c.type == "t1").subquery() d["t2"] = t1.join(t2) pjoin = polymorphic_union(d, None, "pjoin") @@ -1681,9 +1675,9 @@ class CustomPKTest(fixtures.MappedTest): # query using get(), using only one value. this requires the # select_table mapper # has the same single-col primary key. - assert sess.query(T1).get(ot1.id).id == ot1.id + assert sess.get(T1, ot1.id).id == ot1.id - ot1 = sess.query(T1).get(ot1.id) + ot1 = sess.get(T1, ot1.id) ot1.data = "hi" sess.flush() diff --git a/test/orm/inheritance/test_basic.py b/test/orm/inheritance/test_basic.py index 6372812d2..9db11d362 100644 --- a/test/orm/inheritance/test_basic.py +++ b/test/orm/inheritance/test_basic.py @@ -19,7 +19,6 @@ from sqlalchemy.orm import class_mapper from sqlalchemy.orm import clear_mappers from sqlalchemy.orm import column_property from sqlalchemy.orm import composite -from sqlalchemy.orm import create_session from sqlalchemy.orm import deferred from sqlalchemy.orm import exc as orm_exc from sqlalchemy.orm import joinedload @@ -43,6 +42,7 @@ from sqlalchemy.testing.assertsql import CompiledSQL from sqlalchemy.testing.assertsql import Conditional from sqlalchemy.testing.assertsql import Or from sqlalchemy.testing.assertsql import RegexSQL +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -194,7 +194,7 @@ class PolyExpressionEagerLoad(fixtures.DeclarativeMappedTest): child_id = Column(Integer, ForeignKey("a.id")) child = relationship("A") - p_a = case([(discriminator == "a", "a")], else_="b") + p_a = case((discriminator == "a", "a"), else_="b") __mapper_args__ = { "polymorphic_identity": "a", @@ -455,7 +455,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): def test_polymorphic_on_expr_explicit_map(self): t2, t1 = self.tables.t2, self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")) mapper( Parent, t1, @@ -470,7 +470,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): def test_polymorphic_on_expr_implicit_map_no_label_joined(self): t2, t1 = self.tables.t2, self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")) mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr) mapper(Child, t2, inherits=Parent, polymorphic_identity="child") @@ -479,9 +479,9 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): def test_polymorphic_on_expr_implicit_map_w_label_joined(self): t2, t1 = self.tables.t2, self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case( - [(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")] - ).label(None) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")).label( + None + ) mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr) mapper(Child, t2, inherits=Parent, polymorphic_identity="child") @@ -492,7 +492,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): with a standalone expr""" t1 = self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")) mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr) mapper(Child, inherits=Parent, polymorphic_identity="child") @@ -503,9 +503,9 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): with a standalone expr""" t1 = self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case( - [(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")] - ).label(None) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")).label( + None + ) mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr) mapper(Child, inherits=Parent, polymorphic_identity="child") @@ -514,7 +514,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): def test_polymorphic_on_column_prop(self): t2, t1 = self.tables.t2, self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")) cprop = column_property(expr) mapper( Parent, @@ -530,7 +530,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest): def test_polymorphic_on_column_str_prop(self): t2, t1 = self.tables.t2, self.tables.t1 Parent, Child = self.classes.Parent, self.classes.Child - expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]) + expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")) cprop = column_property(expr) mapper( Parent, @@ -1158,13 +1158,10 @@ class GetTest(fixtures.MappedTest): class Blub(Bar): pass - def test_get_polymorphic(self): - self._do_get_test(True) - - def test_get_nonpolymorphic(self): - self._do_get_test(False) - - def _do_get_test(self, polymorphic): + @testing.combinations( + ("polymorphic", True), ("test_get_nonpolymorphic", False), id_="ia" + ) + def test_get(self, polymorphic): foo, Bar, Blub, blub, bar, Foo = ( self.tables.foo, self.classes.Bar, @@ -1197,18 +1194,18 @@ class GetTest(fixtures.MappedTest): if polymorphic: def go(): - assert sess.query(Foo).get(f.id) is f - assert sess.query(Foo).get(b.id) is b - assert sess.query(Foo).get(bl.id) is bl - assert sess.query(Bar).get(b.id) is b - assert sess.query(Bar).get(bl.id) is bl - assert sess.query(Blub).get(bl.id) is bl + assert sess.get(Foo, f.id) is f + assert sess.get(Foo, b.id) is b + assert sess.get(Foo, bl.id) is bl + assert sess.get(Bar, b.id) is b + assert sess.get(Bar, bl.id) is bl + assert sess.get(Blub, bl.id) is bl # test class mismatches - item is present # in the identity map but we requested a subclass - assert sess.query(Blub).get(f.id) is None - assert sess.query(Blub).get(b.id) is None - assert sess.query(Bar).get(f.id) is None + assert sess.get(Blub, f.id) is None + assert sess.get(Blub, b.id) is None + assert sess.get(Bar, f.id) is None self.assert_sql_count(testing.db, go, 0) else: @@ -1217,20 +1214,20 @@ class GetTest(fixtures.MappedTest): # polymorphic. the important part being that get() always # returns an instance of the query's type. def go(): - assert sess.query(Foo).get(f.id) is f + assert sess.get(Foo, f.id) is f - bb = sess.query(Foo).get(b.id) + bb = sess.get(Foo, b.id) assert isinstance(b, Foo) and bb.id == b.id - bll = sess.query(Foo).get(bl.id) + bll = sess.get(Foo, bl.id) assert isinstance(bll, Foo) and bll.id == bl.id - assert sess.query(Bar).get(b.id) is b + assert sess.get(Bar, b.id) is b - bll = sess.query(Bar).get(bl.id) + bll = sess.get(Bar, bl.id) assert isinstance(bll, Bar) and bll.id == bl.id - assert sess.query(Blub).get(bl.id) is bl + assert sess.get(Blub, bl.id) is bl self.assert_sql_count(testing.db, go, 3) @@ -1241,8 +1238,7 @@ class EagerLazyTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): - global foo, bar, bar_foo - foo = Table( + Table( "foo", metadata, Column( @@ -1250,22 +1246,25 @@ class EagerLazyTest(fixtures.MappedTest): ), Column("data", String(30)), ) - bar = Table( + Table( "bar", metadata, Column("id", Integer, ForeignKey("foo.id"), primary_key=True), Column("bar_data", String(30)), ) - bar_foo = Table( + Table( "bar_foo", metadata, Column("bar_id", Integer, ForeignKey("bar.id")), Column("foo_id", Integer, ForeignKey("foo.id")), ) - def test_basic(self): - class Foo(object): + @classmethod + def setup_mappers(cls): + foo, bar, bar_foo = cls.tables("foo", "bar", "bar_foo") + + class Foo(cls.Comparable): pass class Bar(Foo): @@ -1278,17 +1277,24 @@ class EagerLazyTest(fixtures.MappedTest): "eager", relationship(foos, bar_foo, lazy="joined", viewonly=True) ) - foo.insert().execute(data="foo1") - bar.insert().execute(id=1, data="bar1") + @classmethod + def insert_data(cls, connection): + foo, bar, bar_foo = cls.tables("foo", "bar", "bar_foo") + + connection.execute(foo.insert(), dict(data="foo1")) + connection.execute(bar.insert(), dict(id=1, data="bar1")) + + connection.execute(foo.insert(), dict(data="foo2")) + connection.execute(bar.insert(), dict(id=2, data="bar2")) - foo.insert().execute(data="foo2") - bar.insert().execute(id=2, data="bar2") + connection.execute(foo.insert(), dict(data="foo3")) # 3 + connection.execute(foo.insert(), dict(data="foo4")) # 4 - foo.insert().execute(data="foo3") # 3 - foo.insert().execute(data="foo4") # 4 + connection.execute(bar_foo.insert(), dict(bar_id=1, foo_id=3)) + connection.execute(bar_foo.insert(), dict(bar_id=2, foo_id=4)) - bar_foo.insert().execute(bar_id=1, foo_id=3) - bar_foo.insert().execute(bar_id=2, foo_id=4) + def test_basic(self): + Bar = self.classes.Bar sess = create_session() q = sess.query(Bar) @@ -1464,7 +1470,7 @@ class FlushTest(fixtures.MappedTest): sess.add(a) sess.flush() - eq_(select(func.count("*")).select_from(user_roles).scalar(), 1) + eq_(sess.scalar(select(func.count("*")).select_from(user_roles)), 1) def test_two(self): admins, users, roles, user_roles = ( @@ -1514,7 +1520,7 @@ class FlushTest(fixtures.MappedTest): a.password = "sadmin" sess.flush() - eq_(select(func.count("*")).select_from(user_roles).scalar(), 1) + eq_(sess.scalar(select(func.count("*")).select_from(user_roles)), 1) class PassiveDeletesTest(fixtures.MappedTest): @@ -2095,16 +2101,15 @@ class DistinctPKTest(fixtures.MappedTest): def _do_test(self, composite): session = create_session() - query = session.query(Employee) if composite: - alice1 = query.get([1, 2]) - bob = query.get([2, 3]) - alice2 = query.get([1, 2]) + alice1 = session.get(Employee, [1, 2]) + bob = session.get(Employee, [2, 3]) + alice2 = session.get(Employee, [1, 2]) else: - alice1 = query.get(1) - bob = query.get(2) - alice2 = query.get(1) + alice1 = session.get(Employee, 1) + bob = session.get(Employee, 2) + alice2 = session.get(Employee, 1) assert alice1.name == alice2.name == "alice" assert bob.name == "bob" @@ -2142,22 +2147,23 @@ class SyncCompileTest(fixtures.MappedTest): Column("data3", String(128)), ) - def test_joins(self): - for j1 in ( - None, - _b_table.c.a_id == _a_table.c.id, - _a_table.c.id == _b_table.c.a_id, - ): - for j2 in ( - None, - _b_table.c.a_id == _c_table.c.b_a_id, - _c_table.c.b_a_id == _b_table.c.a_id, - ): - self._do_test(j1, j2) - for t in reversed(_a_table.metadata.sorted_tables): - t.delete().execute().close() - - def _do_test(self, j1, j2): + @testing.combinations( + lambda _a_table, _b_table: None, + lambda _a_table, _b_table: _b_table.c.a_id == _a_table.c.id, + lambda _a_table, _b_table: _a_table.c.id == _b_table.c.a_id, + argnames="j1", + ) + @testing.combinations( + lambda _b_table, _c_table: None, + lambda _b_table, _c_table: _b_table.c.a_id == _c_table.c.b_a_id, + lambda _b_table, _c_table: _c_table.c.b_a_id == _b_table.c.a_id, + argnames="j2", + ) + def test_joins(self, j1, j2): + _a_table, _b_table, _c_table = self.tables("a", "b", "c") + j1 = testing.resolve_lambda(j1, **locals()) + j2 = testing.resolve_lambda(j2, **locals()) + class A(object): def __init__(self, **kwargs): for key, value in list(kwargs.items()): @@ -2282,7 +2288,7 @@ class OverrideColKeyTest(fixtures.MappedTest): sess = create_session() sess.add(s1) sess.flush() - assert sess.query(Sub).get(10) is s1 + assert sess.get(Sub, 10) is s1 def test_override_onlyinparent(self): class Base(object): @@ -2312,7 +2318,7 @@ class OverrideColKeyTest(fixtures.MappedTest): sess.flush() # s1 gets '10' - assert sess.query(Sub).get(10) is s1 + assert sess.get(Sub, 10) is s1 # s2 gets a new id, base_id is overwritten by the ultimate # PK col @@ -2432,8 +2438,8 @@ class OverrideColKeyTest(fixtures.MappedTest): sess.flush() sess.expunge_all() - assert sess.query(Base).get(b1.base_id).subdata == "this is base" - assert sess.query(Sub).get(s1.base_id).subdata == "this is sub" + assert sess.get(Base, b1.base_id).subdata == "this is base" + assert sess.get(Sub, s1.base_id).subdata == "this is sub" def test_base_descriptors_over_base_cols(self): class Base(object): @@ -2457,8 +2463,8 @@ class OverrideColKeyTest(fixtures.MappedTest): sess.flush() sess.expunge_all() - assert sess.query(Base).get(b1.base_id).data == "this is base" - assert sess.query(Sub).get(s1.base_id).data == "this is base" + assert sess.get(Base, b1.base_id).data == "this is base" + assert sess.get(Sub, s1.base_id).data == "this is base" class OptimizedLoadTest(fixtures.MappedTest): @@ -2587,7 +2593,7 @@ class OptimizedLoadTest(fixtures.MappedTest): polymorphic_identity="sub", with_polymorphic=( "*", - base.outerjoin(sub).select(use_labels=True).alias("foo"), + base.outerjoin(sub).select().apply_labels().alias("foo"), ), ) sess = Session() @@ -3602,4 +3608,4 @@ class NameConflictTest(fixtures.MappedTest): sess.flush() f_id = f.id sess.expunge_all() - assert sess.query(Content).get(f_id).content_type == "bar" + assert sess.get(Content, f_id).content_type == "bar" diff --git a/test/orm/inheritance/test_concrete.py b/test/orm/inheritance/test_concrete.py index 376337b25..9b0a050d7 100644 --- a/test/orm/inheritance/test_concrete.py +++ b/test/orm/inheritance/test_concrete.py @@ -7,7 +7,6 @@ from sqlalchemy.orm import attributes from sqlalchemy.orm import class_mapper from sqlalchemy.orm import clear_mappers from sqlalchemy.orm import configure_mappers -from sqlalchemy.orm import create_session from sqlalchemy.orm import joinedload from sqlalchemy.orm import mapper from sqlalchemy.orm import polymorphic_union @@ -18,6 +17,7 @@ from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -359,7 +359,7 @@ class ConcreteTest(fixtures.MappedTest): eq_(test_calls.mock_calls, [mock.call.engineer_info_instance()]) session.add(tom) - session.flush() + session.commit() session.close() @@ -443,9 +443,9 @@ class ConcreteTest(fixtures.MappedTest): assert ( len( - testing.db.execute( - session.query(Employee).with_labels().statement - ).fetchall() + session.connection() + .execute(session.query(Employee).with_labels().statement) + .fetchall() ) == 3 ) @@ -518,17 +518,19 @@ class ConcreteTest(fixtures.MappedTest): session.flush() eq_( len( - testing.db.execute( + session.connection() + .execute( session.query(Employee) .with_polymorphic("*", pjoin, pjoin.c.type) .with_labels() .statement - ).fetchall() + ) + .fetchall() ), 4, ) - eq_(session.query(Employee).get(jdoe.employee_id), jdoe) - eq_(session.query(Engineer).get(jerry.employee_id), jerry) + eq_(session.get(Employee, jdoe.employee_id), jdoe) + eq_(session.get(Engineer, jerry.employee_id), jerry) eq_( set( [ @@ -575,33 +577,41 @@ class ConcreteTest(fixtures.MappedTest): # test adaption of the column by wrapping the query in a # subquery - eq_( - len( - testing.db.execute( - session.query(Engineer) - .with_polymorphic("*", pjoin2, pjoin2.c.type) - .from_self() - .statement - ).fetchall() - ), - 2, - ) - eq_( - set( - [ - repr(x) - for x in session.query(Engineer) - .with_polymorphic("*", pjoin2, pjoin2.c.type) - .from_self() - ] - ), - set( - [ - "Engineer Jerry knows how to program", - "Hacker Kurt 'Badass' knows how to hack", - ] - ), - ) + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + eq_( + len( + session.connection() + .execute( + session.query(Engineer) + .with_polymorphic("*", pjoin2, pjoin2.c.type) + .from_self() + .statement + ) + .fetchall() + ), + 2, + ) + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + eq_( + set( + [ + repr(x) + for x in session.query(Engineer) + .with_polymorphic("*", pjoin2, pjoin2.c.type) + .from_self() + ] + ), + set( + [ + "Engineer Jerry knows how to program", + "Hacker Kurt 'Badass' knows how to hack", + ] + ), + ) def test_relationship(self): pjoin = polymorphic_union( @@ -638,7 +648,7 @@ class ConcreteTest(fixtures.MappedTest): session.expunge_all() def go(): - c2 = session.query(Company).get(c.id) + c2 = session.get(Company, c.id) assert set([repr(x) for x in c2.employees]) == set( [ "Engineer Kurt knows how to hack", @@ -650,10 +660,8 @@ class ConcreteTest(fixtures.MappedTest): session.expunge_all() def go(): - c2 = ( - session.query(Company) - .options(joinedload(Company.employees)) - .get(c.id) + c2 = session.get( + Company, c.id, options=[joinedload(Company.employees)] ) assert set([repr(x) for x in c2.employees]) == set( [ @@ -1174,13 +1182,17 @@ class ColKeysTest(fixtures.MappedTest): def insert_data(cls, connection): connection.execute( refugees_table.insert(), - dict(refugee_fid=1, name="refugee1"), - dict(refugee_fid=2, name="refugee2"), + [ + dict(refugee_fid=1, name="refugee1"), + dict(refugee_fid=2, name="refugee2"), + ], ) connection.execute( offices_table.insert(), - dict(office_fid=1, name="office1"), - dict(office_fid=2, name="office2"), + [ + dict(office_fid=1, name="office1"), + dict(office_fid=2, name="office2"), + ], ) def test_keys(self): @@ -1220,7 +1232,7 @@ class ColKeysTest(fixtures.MappedTest): polymorphic_identity="refugee", ) sess = create_session() - eq_(sess.query(Refugee).get(1).name, "refugee1") - eq_(sess.query(Refugee).get(2).name, "refugee2") - eq_(sess.query(Office).get(1).name, "office1") - eq_(sess.query(Office).get(2).name, "office2") + eq_(sess.get(Refugee, 1).name, "refugee1") + eq_(sess.get(Refugee, 2).name, "refugee2") + eq_(sess.get(Office, 1).name, "office1") + eq_(sess.get(Office, 2).name, "office2") diff --git a/test/orm/inheritance/test_magazine.py b/test/orm/inheritance/test_magazine.py index 228cb1273..334ed22f3 100644 --- a/test/orm/inheritance/test_magazine.py +++ b/test/orm/inheritance/test_magazine.py @@ -247,9 +247,9 @@ class MagazineTest(fixtures.MappedTest): "c": self.tables.page.join(self.tables.magazine_page).join( self.tables.classified_page ), - "p": self.tables.page.select( - self.tables.page.c.type == "p" - ).subquery(), + "p": self.tables.page.select() + .where(self.tables.page.c.type == "p") + .subquery(), }, None, "page_join", diff --git a/test/orm/inheritance/test_manytomany.py b/test/orm/inheritance/test_manytomany.py index dc162321f..207ac09c7 100644 --- a/test/orm/inheritance/test_manytomany.py +++ b/test/orm/inheritance/test_manytomany.py @@ -5,11 +5,11 @@ from sqlalchemy import Sequence from sqlalchemy import String from sqlalchemy import Table from sqlalchemy.orm import class_mapper -from sqlalchemy.orm import create_session from sqlalchemy.orm import mapper from sqlalchemy.orm import relationship from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures +from sqlalchemy.testing.fixtures import create_session class InheritTest(fixtures.MappedTest): @@ -171,7 +171,7 @@ class InheritTest2(fixtures.MappedTest): # test that "bar.bid" does not need to be referenced in a get # (ticket 185) - assert sess.query(Bar).get(b.id).id == b.id + assert sess.get(Bar, b.id).id == b.id def test_basic(self): class Foo(object): diff --git a/test/orm/inheritance/test_poly_linked_list.py b/test/orm/inheritance/test_poly_linked_list.py index e51026858..91be4d167 100644 --- a/test/orm/inheritance/test_poly_linked_list.py +++ b/test/orm/inheritance/test_poly_linked_list.py @@ -4,10 +4,10 @@ from sqlalchemy import String from sqlalchemy.orm import backref from sqlalchemy.orm import clear_mappers from sqlalchemy.orm import configure_mappers -from sqlalchemy.orm import create_session from sqlalchemy.orm import mapper from sqlalchemy.orm import relationship from sqlalchemy.testing import fixtures +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table diff --git a/test/orm/inheritance/test_poly_persistence.py b/test/orm/inheritance/test_poly_persistence.py index 25eeb3d1d..99cab870b 100644 --- a/test/orm/inheritance/test_poly_persistence.py +++ b/test/orm/inheritance/test_poly_persistence.py @@ -6,7 +6,6 @@ from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing -from sqlalchemy.orm import create_session from sqlalchemy.orm import mapper from sqlalchemy.orm import polymorphic_union from sqlalchemy.orm import relationship @@ -15,6 +14,7 @@ from sqlalchemy.testing import assert_raises from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column @@ -123,7 +123,9 @@ class InsertOrderTest(PolymorphTest): { "engineer": people.join(engineers), "manager": people.join(managers), - "person": people.select(people.c.type == "person").subquery(), + "person": people.select() + .where(people.c.type == "person") + .subquery(), }, None, "pjoin", @@ -191,7 +193,7 @@ class InsertOrderTest(PolymorphTest): session.add(c) session.flush() session.expunge_all() - eq_(session.query(Company).get(c.company_id), c) + eq_(session.get(Company, c.company_id), c) @testing.combinations( @@ -235,9 +237,9 @@ class RoundTripTest(PolymorphTest): { "engineer": people.join(engineers), "manager": people.join(managers), - "person": people.select( - people.c.type == "person" - ).subquery(), + "person": people.select() + .where(people.c.type == "person") + .subquery(), }, None, "pjoin", @@ -399,7 +401,7 @@ class RoundTripTest(PolymorphTest): employees = session.query(Person).order_by(Person.person_id).all() company = session.query(Company).first() - eq_(session.query(Person).get(dilbert.person_id), dilbert) + eq_(session.get(Person, dilbert.person_id), dilbert) session.expunge_all() eq_( @@ -411,7 +413,7 @@ class RoundTripTest(PolymorphTest): session.expunge_all() def go(): - cc = session.query(Company).get(company.company_id) + cc = session.get(Company, company.company_id) eq_(cc.employees, employees) if not lazy_relationship: @@ -471,7 +473,7 @@ class RoundTripTest(PolymorphTest): # the "palias" alias does *not* get sucked up # into the "person_join" conversion. palias = people.alias("palias") - dilbert = session.query(Person).get(dilbert.person_id) + dilbert = session.get(Person, dilbert.person_id) is_( dilbert, session.query(Person) diff --git a/test/orm/inheritance/test_polymorphic_rel.py b/test/orm/inheritance/test_polymorphic_rel.py index 86e0bd360..69a485d41 100644 --- a/test/orm/inheritance/test_polymorphic_rel.py +++ b/test/orm/inheritance/test_polymorphic_rel.py @@ -5,7 +5,6 @@ from sqlalchemy import select from sqlalchemy import testing from sqlalchemy import true from sqlalchemy.orm import aliased -from sqlalchemy.orm import create_session from sqlalchemy.orm import defaultload from sqlalchemy.orm import join from sqlalchemy.orm import joinedload @@ -15,6 +14,7 @@ from sqlalchemy.orm import with_polymorphic from sqlalchemy.testing import assert_raises from sqlalchemy.testing import eq_ from sqlalchemy.testing.assertsql import CompiledSQL +from sqlalchemy.testing.fixtures import create_session from ._poly_fixtures import _Polymorphic from ._poly_fixtures import _PolymorphicAliasedJoins from ._poly_fixtures import _PolymorphicJoins @@ -148,18 +148,18 @@ class _PolymorphicTestBase(object): self.assert_sql_count(testing.db, go, 3) eq_( - select(func.count("*")) - .select_from( - sess.query(Person) - .with_polymorphic("*") - .options(joinedload(Engineer.machines)) - .order_by(Person.person_id) - .limit(2) - .offset(1) - .with_labels() - .subquery() - ) - .scalar(), + sess.scalar( + select(func.count("*")).select_from( + sess.query(Person) + .with_polymorphic("*") + .options(joinedload(Engineer.machines)) + .order_by(Person.person_id) + .limit(2) + .offset(1) + .with_labels() + .subquery() + ) + ), 2, ) @@ -170,21 +170,21 @@ class _PolymorphicTestBase(object): """ sess = create_session() eq_( - sess.query(Person).get(e1.person_id), + sess.get(Person, e1.person_id), Engineer(name="dilbert", primary_language="java"), ) def test_get_two(self): sess = create_session() eq_( - sess.query(Engineer).get(e1.person_id), + sess.get(Engineer, e1.person_id), Engineer(name="dilbert", primary_language="java"), ) def test_get_three(self): sess = create_session() eq_( - sess.query(Manager).get(b1.person_id), + sess.get(Manager, b1.person_id), Boss(name="pointy haired boss", golf_swing="fore"), ) @@ -230,7 +230,7 @@ class _PolymorphicTestBase(object): ) def test_multi_join_future(self): - sess = create_session(testing.db, future=True) + sess = create_session(future=True) e = aliased(Person) c = aliased(Company) @@ -283,7 +283,7 @@ class _PolymorphicTestBase(object): eq_(sess.query(Engineer).all()[0], Engineer(name="dilbert")) def test_filter_on_subclass_one_future(self): - sess = create_session(testing.db, future=True) + sess = create_session(future=True) eq_( sess.execute(select(Engineer)).scalar(), Engineer(name="dilbert"), ) @@ -337,7 +337,7 @@ class _PolymorphicTestBase(object): ) def test_join_from_polymorphic_nonaliased_one_future(self): - sess = create_session(testing.db, future=True) + sess = create_session(future=True) eq_( sess.execute( select(Person) @@ -396,7 +396,7 @@ class _PolymorphicTestBase(object): ) def test_join_from_polymorphic_flag_aliased_one_future(self): - sess = create_session(testing.db, future=True) + sess = create_session(future=True) pa = aliased(Paperwork) eq_( @@ -496,7 +496,7 @@ class _PolymorphicTestBase(object): ) def test_join_from_with_polymorphic_nonaliased_one_future(self): - sess = create_session(testing.db, future=True) + sess = create_session(future=True) pm = with_polymorphic(Person, [Manager]) eq_( @@ -1552,16 +1552,19 @@ class _PolymorphicTestBase(object): palias = aliased(Person) expected = [(m1, e1), (m1, e2), (m1, b1)] - eq_( - sess.query(Person, palias) - .filter(Person.company_id == palias.company_id) - .filter(Person.name == "dogbert") - .filter(Person.person_id > palias.person_id) - .from_self() - .order_by(Person.person_id, palias.person_id) - .all(), - expected, - ) + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + eq_( + sess.query(Person, palias) + .filter(Person.company_id == palias.company_id) + .filter(Person.name == "dogbert") + .filter(Person.person_id > palias.person_id) + .from_self() + .order_by(Person.person_id, palias.person_id) + .all(), + expected, + ) def test_self_referential_two_point_five(self): """Using two aliases, the above case works. @@ -1572,21 +1575,24 @@ class _PolymorphicTestBase(object): expected = [(m1, e1), (m1, e2), (m1, b1)] - eq_( - sess.query(palias, palias2) - .filter(palias.company_id == palias2.company_id) - .filter(palias.name == "dogbert") - .filter(palias.person_id > palias2.person_id) - .from_self() - .order_by(palias.person_id, palias2.person_id) - .all(), - expected, - ) + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + eq_( + sess.query(palias, palias2) + .filter(palias.company_id == palias2.company_id) + .filter(palias.name == "dogbert") + .filter(palias.person_id > palias2.person_id) + .from_self() + .order_by(palias.person_id, palias2.person_id) + .all(), + expected, + ) def test_self_referential_two_future(self): # TODO: this is the SECOND test *EVER* of an aliased class of # an aliased class. - sess = create_session(testing.db, future=True) + sess = create_session(future=True) expected = [(m1, e1), (m1, e2), (m1, b1)] # not aliasing the first class @@ -1615,7 +1621,7 @@ class _PolymorphicTestBase(object): # TODO: this is the first test *EVER* of an aliased class of # an aliased class. we should add many more tests for this. # new case added in Id810f485c5f7ed971529489b84694e02a3356d6d - sess = create_session(testing.db, future=True) + sess = create_session(future=True) expected = [(m1, e1), (m1, e2), (m1, b1)] # aliasing the first class diff --git a/test/orm/inheritance/test_productspec.py b/test/orm/inheritance/test_productspec.py index aab3570c9..5fd2c5a6f 100644 --- a/test/orm/inheritance/test_productspec.py +++ b/test/orm/inheritance/test_productspec.py @@ -7,11 +7,11 @@ from sqlalchemy import Integer from sqlalchemy import LargeBinary from sqlalchemy import String from sqlalchemy.orm import backref -from sqlalchemy.orm import create_session from sqlalchemy.orm import deferred from sqlalchemy.orm import mapper from sqlalchemy.orm import relationship from sqlalchemy.testing import fixtures +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table diff --git a/test/orm/inheritance/test_relationship.py b/test/orm/inheritance/test_relationship.py index c7d2042b3..8590949a7 100644 --- a/test/orm/inheritance/test_relationship.py +++ b/test/orm/inheritance/test_relationship.py @@ -8,12 +8,12 @@ from sqlalchemy.orm import aliased from sqlalchemy.orm import backref from sqlalchemy.orm import configure_mappers from sqlalchemy.orm import contains_eager -from sqlalchemy.orm import create_session from sqlalchemy.orm import joinedload from sqlalchemy.orm import mapper from sqlalchemy.orm import relationship from sqlalchemy.orm import selectinload from sqlalchemy.orm import Session +from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import subqueryload from sqlalchemy.orm import with_polymorphic from sqlalchemy.testing import AssertsCompiledSQL @@ -21,6 +21,7 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing.entities import ComparableEntity +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -589,10 +590,9 @@ class M2MFilterTest(fixtures.MappedTest): e4 = Engineer(name="e4") org1 = Organization(name="org1", engineers=[e1, e2]) org2 = Organization(name="org2", engineers=[e3, e4]) - sess = create_session(connection) - sess.add(org1) - sess.add(org2) - sess.flush() + with sessionmaker(connection).begin() as sess: + sess.add(org1) + sess.add(org2) def test_not_contains(self): Organization = self.classes.Organization @@ -838,9 +838,11 @@ class SelfReferentialM2MTest(fixtures.MappedTest, AssertsCompiledSQL): # another way to check eq_( - select(func.count("*")) - .select_from(q.limit(1).with_labels().subquery()) - .scalar(), + sess.scalar( + select(func.count("*")).select_from( + q.limit(1).with_labels().subquery() + ) + ), 1, ) assert q.first() is c1 @@ -1134,15 +1136,16 @@ class SubClassEagerToSubClassTest(fixtures.MappedTest): global p1, p2 Sub, Subparent = cls.classes.Sub, cls.classes.Subparent - sess = create_session(connection) - p1 = Subparent( - data="p1", - children=[Sub(data="s1"), Sub(data="s2"), Sub(data="s3")], - ) - p2 = Subparent(data="p2", children=[Sub(data="s4"), Sub(data="s5")]) - sess.add(p1) - sess.add(p2) - sess.flush() + with sessionmaker(connection).begin() as sess: + p1 = Subparent( + data="p1", + children=[Sub(data="s1"), Sub(data="s2"), Sub(data="s3")], + ) + p2 = Subparent( + data="p2", children=[Sub(data="s4"), Sub(data="s5")] + ) + sess.add(p1) + sess.add(p2) def test_joinedload(self): Subparent = self.classes.Subparent @@ -1725,7 +1728,7 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest): "JOIN ep2 ON base2.id = ep2.base2_id", ) - def test_six(self): + def test_six_legacy(self): Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes() s = Session() @@ -1735,8 +1738,38 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest): # this query is coming out instead which is equivalent, but not # totally sure where this happens + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + self.assert_compile( + s.query(Sub2).from_self().join(Sub2.ep1).join(Sub2.ep2), + "SELECT anon_1.sub2_id AS anon_1_sub2_id, " + "anon_1.base2_base1_id AS anon_1_base2_base1_id, " + "anon_1.base2_data AS anon_1_base2_data, " + "anon_1.sub2_subdata AS anon_1_sub2_subdata " + "FROM (SELECT sub2.id AS sub2_id, base2.id AS base2_id, " + "base2.base1_id AS base2_base1_id, base2.data AS base2_data, " + "sub2.subdata AS sub2_subdata " + "FROM base2 JOIN sub2 ON base2.id = sub2.id) AS anon_1 " + "JOIN ep1 ON anon_1.sub2_id = ep1.base2_id " + "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id", + ) + + def test_six(self): + Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes() + + # as of from_self() changing in + # I3abfb45dd6e50f84f29d39434caa0b550ce27864, + # this query is coming out instead which is equivalent, but not + # totally sure where this happens + + stmt = select(Sub2) + + subq = aliased(Sub2, stmt.apply_labels().subquery()) + + stmt = select(subq).join(subq.ep1).join(Sub2.ep2).apply_labels() self.assert_compile( - s.query(Sub2).from_self().join(Sub2.ep1).join(Sub2.ep2), + stmt, "SELECT anon_1.sub2_id AS anon_1_sub2_id, " "anon_1.base2_base1_id AS anon_1_base2_base1_id, " "anon_1.base2_data AS anon_1_base2_data, " @@ -1749,7 +1782,7 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest): "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id", ) - def test_seven(self): + def test_seven_legacy(self): Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes() s = Session() @@ -1758,16 +1791,74 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest): # I3abfb45dd6e50f84f29d39434caa0b550ce27864, # this query is coming out instead which is equivalent, but not # totally sure where this happens + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + + self.assert_compile( + # adding Sub2 to the entities list helps it, + # otherwise the joins for Sub2.ep1/ep2 don't have columns + # to latch onto. Can't really make it better than this + s.query(Parent, Sub2) + .join(Parent.sub1) + .join(Sub1.sub2) + .from_self() + .join(Sub2.ep1) + .join(Sub2.ep2), + "SELECT anon_1.parent_id AS anon_1_parent_id, " + "anon_1.parent_data AS anon_1_parent_data, " + "anon_1.sub2_id AS anon_1_sub2_id, " + "anon_1.base2_base1_id AS anon_1_base2_base1_id, " + "anon_1.base2_data AS anon_1_base2_data, " + "anon_1.sub2_subdata AS anon_1_sub2_subdata " + "FROM (SELECT parent.id AS parent_id, " + "parent.data AS parent_data, " + "sub2.id AS sub2_id, " + "base2.id AS base2_id, " + "base2.base1_id AS base2_base1_id, " + "base2.data AS base2_data, " + "sub2.subdata AS sub2_subdata " + "FROM parent JOIN (base1 JOIN sub1 ON base1.id = sub1.id) " + "ON parent.id = sub1.parent_id JOIN " + "(base2 JOIN sub2 ON base2.id = sub2.id) " + "ON base1.id = base2.base1_id) AS anon_1 " + "JOIN ep1 ON anon_1.sub2_id = ep1.base2_id " + "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id", + ) + + def test_seven(self): + Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes() + + # as of from_self() changing in + # I3abfb45dd6e50f84f29d39434caa0b550ce27864, + # this query is coming out instead which is equivalent, but not + # totally sure where this happens + + subq = ( + select(Parent, Sub2) + .join(Parent.sub1) + .join(Sub1.sub2) + .apply_labels() + .subquery() + ) + + # another 1.4 supercharged select() statement ;) + + palias = aliased(Parent, subq) + sub2alias = aliased(Sub2, subq) + + stmt = ( + select(palias, sub2alias) + .join(sub2alias.ep1) + .join(sub2alias.ep2) + .apply_labels() + ) + self.assert_compile( # adding Sub2 to the entities list helps it, # otherwise the joins for Sub2.ep1/ep2 don't have columns # to latch onto. Can't really make it better than this - s.query(Parent, Sub2) - .join(Parent.sub1) - .join(Sub1.sub2) - .from_self() - .join(Sub2.ep1) - .join(Sub2.ep2), + stmt, "SELECT anon_1.parent_id AS anon_1_parent_id, " "anon_1.parent_data AS anon_1_parent_data, " "anon_1.sub2_id AS anon_1_sub2_id, " diff --git a/test/orm/inheritance/test_single.py b/test/orm/inheritance/test_single.py index 4adb1a5cb..65dd9c251 100644 --- a/test/orm/inheritance/test_single.py +++ b/test/orm/inheritance/test_single.py @@ -12,7 +12,6 @@ from sqlalchemy import testing from sqlalchemy import true from sqlalchemy.orm import aliased from sqlalchemy.orm import Bundle -from sqlalchemy.orm import create_session from sqlalchemy.orm import joinedload from sqlalchemy.orm import mapper from sqlalchemy.orm import relationship @@ -24,6 +23,7 @@ from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing.assertsql import CompiledSQL +from sqlalchemy.testing.fixtures import create_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -280,12 +280,53 @@ class SingleInheritanceTest(testing.AssertsCompiledSQL, fixtures.MappedTest): [e2id], ) - def test_from_self(self): + def test_from_self_legacy(self): Engineer = self.classes.Engineer sess = create_session() + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + self.assert_compile( + sess.query(Engineer).from_self(), + "SELECT anon_1.employees_employee_id AS " + "anon_1_employees_employee_id, " + "anon_1.employees_name AS " + "anon_1_employees_name, " + "anon_1.employees_manager_data AS " + "anon_1_employees_manager_data, " + "anon_1.employees_engineer_info AS " + "anon_1_employees_engineer_info, " + "anon_1.employees_type AS " + "anon_1_employees_type FROM (SELECT " + "employees.employee_id AS " + "employees_employee_id, employees.name AS " + "employees_name, employees.manager_data AS " + "employees_manager_data, " + "employees.engineer_info AS " + "employees_engineer_info, employees.type " + "AS employees_type FROM employees WHERE " + "employees.type IN ([POSTCOMPILE_type_1])) AS " + "anon_1", + use_default_dialect=True, + ) + + def test_from_subq(self): + Engineer = self.classes.Engineer + + stmt = select(Engineer) + subq = aliased(Engineer, stmt.apply_labels().subquery()) + + # so here we have an extra "WHERE type in ()", because + # both the inner and the outer queries have the Engineer entity. + # this is expected at the moment but it would be nice if + # _enable_single_crit or something similar could propagate here. + # legacy from_self() takes care of this because it applies + # _enable_single_crit at that moment. + + stmt = select(subq).apply_labels() self.assert_compile( - sess.query(Engineer).from_self(), + stmt, "SELECT anon_1.employees_employee_id AS " "anon_1_employees_employee_id, " "anon_1.employees_name AS " @@ -304,7 +345,7 @@ class SingleInheritanceTest(testing.AssertsCompiledSQL, fixtures.MappedTest): "employees_engineer_info, employees.type " "AS employees_type FROM employees WHERE " "employees.type IN ([POSTCOMPILE_type_1])) AS " - "anon_1", + "anon_1 WHERE anon_1.employees_type IN ([POSTCOMPILE_type_2])", use_default_dialect=True, ) @@ -382,14 +423,17 @@ class SingleInheritanceTest(testing.AssertsCompiledSQL, fixtures.MappedTest): sess = create_session() col = func.count(literal_column("*")) - self.assert_compile( - sess.query(Engineer.employee_id).from_self(col), - "SELECT count(*) AS count_1 " - "FROM (SELECT employees.employee_id AS employees_employee_id " - "FROM employees " - "WHERE employees.type IN ([POSTCOMPILE_type_1])) AS anon_1", - use_default_dialect=True, - ) + with testing.expect_deprecated( + r"The Query.from_self\(\) function/method" + ): + self.assert_compile( + sess.query(Engineer.employee_id).from_self(col), + "SELECT count(*) AS count_1 " + "FROM (SELECT employees.employee_id AS employees_employee_id " + "FROM employees " + "WHERE employees.type IN ([POSTCOMPILE_type_1])) AS anon_1", + use_default_dialect=True, + ) def test_select_from_count(self): Manager, Engineer = (self.classes.Manager, self.classes.Engineer) |
