diff options
Diffstat (limited to 'test/orm')
| -rw-r--r-- | test/orm/test_bulk.py | 358 | ||||
| -rw-r--r-- | test/orm/test_cycles.py | 28 | ||||
| -rw-r--r-- | test/orm/test_deferred.py | 133 | ||||
| -rw-r--r-- | test/orm/test_loading.py | 33 | ||||
| -rw-r--r-- | test/orm/test_mapper.py | 13 | ||||
| -rw-r--r-- | test/orm/test_naturalpks.py | 76 | ||||
| -rw-r--r-- | test/orm/test_query.py | 105 | ||||
| -rw-r--r-- | test/orm/test_session.py | 7 | ||||
| -rw-r--r-- | test/orm/test_transaction.py | 17 | ||||
| -rw-r--r-- | test/orm/test_unitofworkv2.py | 99 | ||||
| -rw-r--r-- | test/orm/test_versioning.py | 28 |
11 files changed, 867 insertions, 30 deletions
diff --git a/test/orm/test_bulk.py b/test/orm/test_bulk.py new file mode 100644 index 000000000..e27d3b73c --- /dev/null +++ b/test/orm/test_bulk.py @@ -0,0 +1,358 @@ +from sqlalchemy import testing +from sqlalchemy.testing import eq_ +from sqlalchemy.testing.schema import Table, Column +from sqlalchemy.testing import fixtures +from sqlalchemy import Integer, String, ForeignKey +from sqlalchemy.orm import mapper, Session +from sqlalchemy.testing.assertsql import CompiledSQL +from test.orm import _fixtures + + +class BulkTest(testing.AssertsExecutionResults): + run_inserts = None + run_define_tables = 'each' + + +class BulkInsertUpdateTest(BulkTest, _fixtures.FixtureTest): + + @classmethod + def setup_mappers(cls): + User, Address = cls.classes("User", "Address") + u, a = cls.tables("users", "addresses") + + mapper(User, u) + mapper(Address, a) + + def test_bulk_save_return_defaults(self): + User, = self.classes("User",) + + s = Session() + objects = [ + User(name="u1"), + User(name="u2"), + User(name="u3") + ] + assert 'id' not in objects[0].__dict__ + + with self.sql_execution_asserter() as asserter: + s.bulk_save_objects(objects, return_defaults=True) + + asserter.assert_( + CompiledSQL( + "INSERT INTO users (name) VALUES (:name)", + [{'name': 'u1'}] + ), + CompiledSQL( + "INSERT INTO users (name) VALUES (:name)", + [{'name': 'u2'}] + ), + CompiledSQL( + "INSERT INTO users (name) VALUES (:name)", + [{'name': 'u3'}] + ), + ) + eq_(objects[0].__dict__['id'], 1) + + def test_bulk_save_no_defaults(self): + User, = self.classes("User",) + + s = Session() + objects = [ + User(name="u1"), + User(name="u2"), + User(name="u3") + ] + assert 'id' not in objects[0].__dict__ + + with self.sql_execution_asserter() as asserter: + s.bulk_save_objects(objects) + + asserter.assert_( + CompiledSQL( + "INSERT INTO users (name) VALUES (:name)", + [{'name': 'u1'}, {'name': 'u2'}, {'name': 'u3'}] + ), + ) + assert 'id' not in objects[0].__dict__ + + def test_bulk_save_updated_include_unchanged(self): + User, = self.classes("User",) + + s = Session(expire_on_commit=False) + objects = [ + User(name="u1"), + User(name="u2"), + User(name="u3") + ] + s.add_all(objects) + s.commit() + + objects[0].name = 'u1new' + objects[2].name = 'u3new' + + s = Session() + with self.sql_execution_asserter() as asserter: + s.bulk_save_objects(objects, update_changed_only=False) + + asserter.assert_( + CompiledSQL( + "UPDATE users SET id=:id, name=:name WHERE " + "users.id = :users_id", + [{'users_id': 1, 'id': 1, 'name': 'u1new'}, + {'users_id': 2, 'id': 2, 'name': 'u2'}, + {'users_id': 3, 'id': 3, 'name': 'u3new'}] + ) + ) + + +class BulkInheritanceTest(BulkTest, fixtures.MappedTest): + @classmethod + def define_tables(cls, metadata): + Table( + 'people', metadata, + Column( + 'person_id', Integer, + primary_key=True, + test_needs_autoincrement=True), + Column('name', String(50)), + Column('type', String(30))) + + Table( + 'engineers', metadata, + Column( + 'person_id', Integer, + ForeignKey('people.person_id'), + primary_key=True), + Column('status', String(30)), + Column('primary_language', String(50))) + + Table( + 'managers', metadata, + Column( + 'person_id', Integer, + ForeignKey('people.person_id'), + primary_key=True), + Column('status', String(30)), + Column('manager_name', String(50))) + + Table( + 'boss', metadata, + Column( + 'boss_id', Integer, + ForeignKey('managers.person_id'), + primary_key=True), + Column('golf_swing', String(30))) + + @classmethod + def setup_classes(cls): + class Base(cls.Comparable): + pass + + class Person(Base): + pass + + class Engineer(Person): + pass + + class Manager(Person): + pass + + class Boss(Manager): + pass + + @classmethod + def setup_mappers(cls): + Person, Engineer, Manager, Boss = \ + cls.classes('Person', 'Engineer', 'Manager', 'Boss') + p, e, m, b = cls.tables('people', 'engineers', 'managers', 'boss') + + mapper( + Person, p, polymorphic_on=p.c.type, + polymorphic_identity='person') + mapper(Engineer, e, inherits=Person, polymorphic_identity='engineer') + mapper(Manager, m, inherits=Person, polymorphic_identity='manager') + mapper(Boss, b, inherits=Manager, polymorphic_identity='boss') + + def test_bulk_save_joined_inh_return_defaults(self): + Person, Engineer, Manager, Boss = \ + self.classes('Person', 'Engineer', 'Manager', 'Boss') + + s = Session() + objects = [ + Manager(name='m1', status='s1', manager_name='mn1'), + Engineer(name='e1', status='s2', primary_language='l1'), + Engineer(name='e2', status='s3', primary_language='l2'), + Boss( + name='b1', status='s3', manager_name='mn2', + golf_swing='g1') + ] + assert 'person_id' not in objects[0].__dict__ + + with self.sql_execution_asserter() as asserter: + s.bulk_save_objects(objects, return_defaults=True) + + asserter.assert_( + CompiledSQL( + "INSERT INTO people (name, type) VALUES (:name, :type)", + [{'type': 'manager', 'name': 'm1'}] + ), + CompiledSQL( + "INSERT INTO managers (person_id, status, manager_name) " + "VALUES (:person_id, :status, :manager_name)", + [{'person_id': 1, 'status': 's1', 'manager_name': 'mn1'}] + ), + CompiledSQL( + "INSERT INTO people (name, type) VALUES (:name, :type)", + [{'type': 'engineer', 'name': 'e1'}] + ), + CompiledSQL( + "INSERT INTO people (name, type) VALUES (:name, :type)", + [{'type': 'engineer', 'name': 'e2'}] + ), + CompiledSQL( + "INSERT INTO engineers (person_id, status, primary_language) " + "VALUES (:person_id, :status, :primary_language)", + [{'person_id': 2, 'status': 's2', 'primary_language': 'l1'}, + {'person_id': 3, 'status': 's3', 'primary_language': 'l2'}] + + ), + CompiledSQL( + "INSERT INTO people (name, type) VALUES (:name, :type)", + [{'type': 'boss', 'name': 'b1'}] + ), + CompiledSQL( + "INSERT INTO managers (person_id, status, manager_name) " + "VALUES (:person_id, :status, :manager_name)", + [{'person_id': 4, 'status': 's3', 'manager_name': 'mn2'}] + + ), + CompiledSQL( + "INSERT INTO boss (boss_id, golf_swing) VALUES " + "(:boss_id, :golf_swing)", + [{'boss_id': 4, 'golf_swing': 'g1'}] + ) + ) + eq_(objects[0].__dict__['person_id'], 1) + eq_(objects[3].__dict__['person_id'], 4) + eq_(objects[3].__dict__['boss_id'], 4) + + def test_bulk_save_joined_inh_no_defaults(self): + Person, Engineer, Manager, Boss = \ + self.classes('Person', 'Engineer', 'Manager', 'Boss') + + s = Session() + with self.sql_execution_asserter() as asserter: + s.bulk_save_objects([ + Manager( + person_id=1, + name='m1', status='s1', manager_name='mn1'), + Engineer( + person_id=2, + name='e1', status='s2', primary_language='l1'), + Engineer( + person_id=3, + name='e2', status='s3', primary_language='l2'), + Boss( + person_id=4, boss_id=4, + name='b1', status='s3', manager_name='mn2', + golf_swing='g1') + ], + + ) + + # the only difference here is that common classes are grouped together. + # at the moment it doesn't lump all the "people" tables from + # different classes together. + asserter.assert_( + CompiledSQL( + "INSERT INTO people (person_id, name, type) VALUES " + "(:person_id, :name, :type)", + [{'person_id': 1, 'type': 'manager', 'name': 'm1'}] + ), + CompiledSQL( + "INSERT INTO managers (person_id, status, manager_name) " + "VALUES (:person_id, :status, :manager_name)", + [{'status': 's1', 'person_id': 1, 'manager_name': 'mn1'}] + ), + CompiledSQL( + "INSERT INTO people (person_id, name, type) VALUES " + "(:person_id, :name, :type)", + [{'person_id': 2, 'type': 'engineer', 'name': 'e1'}, + {'person_id': 3, 'type': 'engineer', 'name': 'e2'}] + ), + CompiledSQL( + "INSERT INTO engineers (person_id, status, primary_language) " + "VALUES (:person_id, :status, :primary_language)", + [{'person_id': 2, 'status': 's2', 'primary_language': 'l1'}, + {'person_id': 3, 'status': 's3', 'primary_language': 'l2'}] + ), + CompiledSQL( + "INSERT INTO people (person_id, name, type) VALUES " + "(:person_id, :name, :type)", + [{'person_id': 4, 'type': 'boss', 'name': 'b1'}] + ), + CompiledSQL( + "INSERT INTO managers (person_id, status, manager_name) " + "VALUES (:person_id, :status, :manager_name)", + [{'status': 's3', 'person_id': 4, 'manager_name': 'mn2'}] + ), + CompiledSQL( + "INSERT INTO boss (boss_id, golf_swing) VALUES " + "(:boss_id, :golf_swing)", + [{'boss_id': 4, 'golf_swing': 'g1'}] + ) + ) + + def test_bulk_insert_joined_inh_return_defaults(self): + Person, Engineer, Manager, Boss = \ + self.classes('Person', 'Engineer', 'Manager', 'Boss') + + s = Session() + with self.sql_execution_asserter() as asserter: + s.bulk_insert_mappings( + Boss, + [ + dict( + name='b1', status='s1', manager_name='mn1', + golf_swing='g1' + ), + dict( + name='b2', status='s2', manager_name='mn2', + golf_swing='g2' + ), + dict( + name='b3', status='s3', manager_name='mn3', + golf_swing='g3' + ), + ], return_defaults=True + ) + + asserter.assert_( + CompiledSQL( + "INSERT INTO people (name) VALUES (:name)", + [{'name': 'b1'}] + ), + CompiledSQL( + "INSERT INTO people (name) VALUES (:name)", + [{'name': 'b2'}] + ), + CompiledSQL( + "INSERT INTO people (name) VALUES (:name)", + [{'name': 'b3'}] + ), + CompiledSQL( + "INSERT INTO managers (person_id, status, manager_name) " + "VALUES (:person_id, :status, :manager_name)", + [{'person_id': 1, 'status': 's1', 'manager_name': 'mn1'}, + {'person_id': 2, 'status': 's2', 'manager_name': 'mn2'}, + {'person_id': 3, 'status': 's3', 'manager_name': 'mn3'}] + + ), + CompiledSQL( + "INSERT INTO boss (boss_id, golf_swing) VALUES " + "(:boss_id, :golf_swing)", + [{'golf_swing': 'g1', 'boss_id': 1}, + {'golf_swing': 'g2', 'boss_id': 2}, + {'golf_swing': 'g3', 'boss_id': 3}] + ) + ) diff --git a/test/orm/test_cycles.py b/test/orm/test_cycles.py index 8e086ff88..c95b8d152 100644 --- a/test/orm/test_cycles.py +++ b/test/orm/test_cycles.py @@ -11,7 +11,7 @@ from sqlalchemy.testing.schema import Table, Column from sqlalchemy.orm import mapper, relationship, backref, \ create_session, sessionmaker from sqlalchemy.testing import eq_ -from sqlalchemy.testing.assertsql import RegexSQL, ExactSQL, CompiledSQL, AllOf +from sqlalchemy.testing.assertsql import RegexSQL, CompiledSQL, AllOf from sqlalchemy.testing import fixtures @@ -284,7 +284,7 @@ class InheritTestTwo(fixtures.MappedTest): Table('c', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True), Column('aid', Integer, - ForeignKey('a.id', use_alter=True, name="foo"))) + ForeignKey('a.id', name="foo"))) @classmethod def setup_classes(cls): @@ -334,7 +334,7 @@ class BiDirectionalManyToOneTest(fixtures.MappedTest): Column('id', Integer, primary_key=True, test_needs_autoincrement=True), Column('data', String(30)), Column('t1id', Integer, - ForeignKey('t1.id', use_alter=True, name="foo_fk"))) + ForeignKey('t1.id', name="foo_fk"))) Table('t3', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True), Column('data', String(30)), @@ -436,7 +436,7 @@ class BiDirectionalOneToManyTest(fixtures.MappedTest): Table('t2', metadata, Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), Column('c2', Integer, - ForeignKey('t1.c1', use_alter=True, name='t1c1_fk'))) + ForeignKey('t1.c1', name='t1c1_fk'))) @classmethod def setup_classes(cls): @@ -491,7 +491,7 @@ class BiDirectionalOneToManyTest2(fixtures.MappedTest): Table('t2', metadata, Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), Column('c2', Integer, - ForeignKey('t1.c1', use_alter=True, name='t1c1_fq')), + ForeignKey('t1.c1', name='t1c1_fq')), test_needs_autoincrement=True) Table('t1_data', metadata, @@ -572,7 +572,7 @@ class OneToManyManyToOneTest(fixtures.MappedTest): Table('ball', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True), Column('person_id', Integer, - ForeignKey('person.id', use_alter=True, name='fk_person_id')), + ForeignKey('person.id', name='fk_person_id')), Column('data', String(30))) Table('person', metadata, @@ -656,7 +656,7 @@ class OneToManyManyToOneTest(fixtures.MappedTest): RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), - ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id " + CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id " "WHERE person.id = :person_id", lambda ctx:{'favorite_ball_id':p.favorite.id, 'person_id':p.id} ), @@ -667,11 +667,11 @@ class OneToManyManyToOneTest(fixtures.MappedTest): self.assert_sql_execution( testing.db, sess.flush, - ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id " + CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id " "WHERE person.id = :person_id", lambda ctx: {'person_id': p.id, 'favorite_ball_id': None}), - ExactSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}]) - ExactSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}]) + CompiledSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}]) + CompiledSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}]) ) def test_post_update_backref(self): @@ -1024,7 +1024,7 @@ class SelfReferentialPostUpdateTest3(fixtures.MappedTest): test_needs_autoincrement=True), Column('name', String(50), nullable=False), Column('child_id', Integer, - ForeignKey('child.id', use_alter=True, name='c1'), nullable=True)) + ForeignKey('child.id', name='c1'), nullable=True)) Table('child', metadata, Column('id', Integer, primary_key=True, @@ -1094,11 +1094,11 @@ class PostUpdateBatchingTest(fixtures.MappedTest): test_needs_autoincrement=True), Column('name', String(50), nullable=False), Column('c1_id', Integer, - ForeignKey('child1.id', use_alter=True, name='c1'), nullable=True), + ForeignKey('child1.id', name='c1'), nullable=True), Column('c2_id', Integer, - ForeignKey('child2.id', use_alter=True, name='c2'), nullable=True), + ForeignKey('child2.id', name='c2'), nullable=True), Column('c3_id', Integer, - ForeignKey('child3.id', use_alter=True, name='c3'), nullable=True) + ForeignKey('child3.id', name='c3'), nullable=True) ) Table('child1', metadata, diff --git a/test/orm/test_deferred.py b/test/orm/test_deferred.py index 1457852d8..1b777b527 100644 --- a/test/orm/test_deferred.py +++ b/test/orm/test_deferred.py @@ -2,10 +2,14 @@ import sqlalchemy as sa from sqlalchemy import testing, util from sqlalchemy.orm import mapper, deferred, defer, undefer, Load, \ load_only, undefer_group, create_session, synonym, relationship, Session,\ - joinedload, defaultload + joinedload, defaultload, aliased, contains_eager, with_polymorphic from sqlalchemy.testing import eq_, AssertsCompiledSQL, assert_raises_message from test.orm import _fixtures -from sqlalchemy.orm import strategies + + +from .inheritance._poly_fixtures import Company, Person, Engineer, Manager, \ + Boss, Machine, Paperwork, _Polymorphic + class DeferredTest(AssertsCompiledSQL, _fixtures.FixtureTest): @@ -595,3 +599,128 @@ class DeferredOptionsTest(AssertsCompiledSQL, _fixtures.FixtureTest): ) +class InheritanceTest(_Polymorphic): + __dialect__ = 'default' + + def test_load_only_subclass(self): + s = Session() + q = s.query(Manager).options(load_only("status", "manager_name")) + self.assert_compile( + q, + "SELECT managers.person_id AS managers_person_id, " + "people.person_id AS people_person_id, " + "people.type AS people_type, " + "managers.status AS managers_status, " + "managers.manager_name AS managers_manager_name " + "FROM people JOIN managers " + "ON people.person_id = managers.person_id " + "ORDER BY people.person_id" + ) + + def test_load_only_subclass_and_superclass(self): + s = Session() + q = s.query(Boss).options(load_only("status", "manager_name")) + self.assert_compile( + q, + "SELECT managers.person_id AS managers_person_id, " + "people.person_id AS people_person_id, " + "people.type AS people_type, " + "managers.status AS managers_status, " + "managers.manager_name AS managers_manager_name " + "FROM people JOIN managers " + "ON people.person_id = managers.person_id JOIN boss " + "ON managers.person_id = boss.boss_id ORDER BY people.person_id" + ) + + def test_load_only_alias_subclass(self): + s = Session() + m1 = aliased(Manager, flat=True) + q = s.query(m1).options(load_only("status", "manager_name")) + self.assert_compile( + q, + "SELECT managers_1.person_id AS managers_1_person_id, " + "people_1.person_id AS people_1_person_id, " + "people_1.type AS people_1_type, " + "managers_1.status AS managers_1_status, " + "managers_1.manager_name AS managers_1_manager_name " + "FROM people AS people_1 JOIN managers AS " + "managers_1 ON people_1.person_id = managers_1.person_id " + "ORDER BY people_1.person_id" + ) + + def test_load_only_subclass_from_relationship_polymorphic(self): + s = Session() + wp = with_polymorphic(Person, [Manager], flat=True) + q = s.query(Company).join(Company.employees.of_type(wp)).options( + contains_eager(Company.employees.of_type(wp)). + load_only(wp.Manager.status, wp.Manager.manager_name) + ) + self.assert_compile( + q, + "SELECT people_1.person_id AS people_1_person_id, " + "people_1.type AS people_1_type, " + "managers_1.person_id AS managers_1_person_id, " + "managers_1.status AS managers_1_status, " + "managers_1.manager_name AS managers_1_manager_name, " + "companies.company_id AS companies_company_id, " + "companies.name AS companies_name " + "FROM companies JOIN (people AS people_1 LEFT OUTER JOIN " + "managers AS managers_1 ON people_1.person_id = " + "managers_1.person_id) ON companies.company_id = " + "people_1.company_id" + ) + + def test_load_only_subclass_from_relationship(self): + s = Session() + from sqlalchemy import inspect + inspect(Company).add_property("managers", relationship(Manager)) + q = s.query(Company).join(Company.managers).options( + contains_eager(Company.managers). + load_only("status", "manager_name") + ) + self.assert_compile( + q, + "SELECT companies.company_id AS companies_company_id, " + "companies.name AS companies_name, " + "managers.person_id AS managers_person_id, " + "people.person_id AS people_person_id, " + "people.type AS people_type, " + "managers.status AS managers_status, " + "managers.manager_name AS managers_manager_name " + "FROM companies JOIN (people JOIN managers ON people.person_id = " + "managers.person_id) ON companies.company_id = people.company_id" + ) + + + def test_defer_on_wildcard_subclass(self): + # pretty much the same as load_only except doesn't + # exclude the primary key + + s = Session() + q = s.query(Manager).options( + defer(".*"), undefer("status")) + self.assert_compile( + q, + "SELECT managers.status AS managers_status " + "FROM people JOIN managers ON " + "people.person_id = managers.person_id ORDER BY people.person_id" + ) + + def test_defer_super_name_on_subclass(self): + s = Session() + q = s.query(Manager).options(defer("name")) + self.assert_compile( + q, + "SELECT managers.person_id AS managers_person_id, " + "people.person_id AS people_person_id, " + "people.company_id AS people_company_id, " + "people.type AS people_type, managers.status AS managers_status, " + "managers.manager_name AS managers_manager_name " + "FROM people JOIN managers " + "ON people.person_id = managers.person_id " + "ORDER BY people.person_id" + ) + + + + diff --git a/test/orm/test_loading.py b/test/orm/test_loading.py index 97c08ea29..f86477ec2 100644 --- a/test/orm/test_loading.py +++ b/test/orm/test_loading.py @@ -1,13 +1,40 @@ from . import _fixtures from sqlalchemy.orm import loading, Session, aliased -from sqlalchemy.testing.assertions import eq_ +from sqlalchemy.testing.assertions import eq_, assert_raises from sqlalchemy.util import KeyedTuple - -# class InstancesTest(_fixtures.FixtureTest): +from sqlalchemy.testing import mock # class GetFromIdentityTest(_fixtures.FixtureTest): # class LoadOnIdentTest(_fixtures.FixtureTest): # class InstanceProcessorTest(_fixture.FixtureTest): + +class InstancesTest(_fixtures.FixtureTest): + run_setup_mappers = 'once' + run_inserts = 'once' + run_deletes = None + + @classmethod + def setup_mappers(cls): + cls._setup_stock_mapping() + + def test_cursor_close_w_failed_rowproc(self): + User = self.classes.User + s = Session() + + q = s.query(User) + + ctx = q._compile_context() + cursor = mock.Mock() + q._entities = [ + mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom"))) + ] + assert_raises( + Exception, + list, loading.instances(q, cursor, ctx) + ) + assert cursor.close.called, "Cursor wasn't closed" + + class MergeResultTest(_fixtures.FixtureTest): run_setup_mappers = 'once' run_inserts = 'once' diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py index 63ba1a207..264b386d4 100644 --- a/test/orm/test_mapper.py +++ b/test/orm/test_mapper.py @@ -716,6 +716,19 @@ class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL): m3.identity_key_from_instance(AddressUser()) ) + def test_reassign_polymorphic_identity_warns(self): + User = self.classes.User + users = self.tables.users + class MyUser(User): + pass + m1 = mapper(User, users, polymorphic_on=users.c.name, + polymorphic_identity='user') + assert_raises_message( + sa.exc.SAWarning, + "Reassigning polymorphic association for identity 'user'", + mapper, + MyUser, users, inherits=User, polymorphic_identity='user' + ) def test_illegal_non_primary(self): diff --git a/test/orm/test_naturalpks.py b/test/orm/test_naturalpks.py index a4e982f84..60387ddce 100644 --- a/test/orm/test_naturalpks.py +++ b/test/orm/test_naturalpks.py @@ -1205,3 +1205,79 @@ class JoinedInheritanceTest(fixtures.MappedTest): eq_(e1.boss_name, 'pointy haired') eq_(e2.boss_name, 'pointy haired') + + +class JoinedInheritancePKOnFKTest(fixtures.MappedTest): + """Test cascades of pk->non-pk/fk on joined table inh.""" + + # mssql doesn't allow ON UPDATE on self-referential keys + __unsupported_on__ = ('mssql',) + + __requires__ = 'skip_mysql_on_windows', + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + fk_args = _backend_specific_fk_args() + + Table( + 'person', metadata, + Column('name', String(50), primary_key=True), + Column('type', String(50), nullable=False), + test_needs_fk=True) + + Table( + 'engineer', metadata, + Column( + 'id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column( + 'person_name', String(50), + ForeignKey('person.name', **fk_args)), + Column('primary_language', String(50)), + test_needs_fk=True + ) + + @classmethod + def setup_classes(cls): + + class Person(cls.Comparable): + pass + + class Engineer(Person): + pass + + def _test_pk(self, passive_updates): + Person, person, Engineer, engineer = ( + self.classes.Person, self.tables.person, + self.classes.Engineer, self.tables.engineer) + + mapper( + Person, person, polymorphic_on=person.c.type, + polymorphic_identity='person', passive_updates=passive_updates) + mapper( + Engineer, engineer, inherits=Person, + polymorphic_identity='engineer') + + sess = sa.orm.sessionmaker()() + + e1 = Engineer(name='dilbert', primary_language='java') + sess.add(e1) + sess.commit() + e1.name = 'wally' + e1.primary_language = 'c++' + + sess.flush() + + eq_(e1.person_name, 'wally') + + sess.expire_all() + eq_(e1.primary_language, "c++") + + @testing.requires.on_update_cascade + def test_pk_passive(self): + self._test_pk(True) + + #@testing.requires.non_updating_cascade + def test_pk_nonpassive(self): + self._test_pk(False) diff --git a/test/orm/test_query.py b/test/orm/test_query.py index 354bbe5b1..a2a1ee096 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -17,6 +17,9 @@ from sqlalchemy.testing.assertions import ( from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assert_warnings from test.orm import _fixtures from sqlalchemy.orm.util import join, with_parent +import contextlib +from sqlalchemy.testing import mock, is_, is_not_ +from sqlalchemy import inspect class QueryTest(_fixtures.FixtureTest): @@ -1484,7 +1487,6 @@ class SliceTest(QueryTest): assert create_session().query(User).filter(User.id == 27). \ first() is None - @testing.only_on('sqlite', 'testing execution but db-specific syntax') def test_limit_offset_applies(self): """Test that the expected LIMIT/OFFSET is applied for slices. @@ -1510,15 +1512,15 @@ class SliceTest(QueryTest): testing.db, lambda: q[:20], [ ( "SELECT users.id AS users_id, users.name " - "AS users_name FROM users LIMIT :param_1 OFFSET :param_2", - {'param_1': 20, 'param_2': 0})]) + "AS users_name FROM users LIMIT :param_1", + {'param_1': 20})]) self.assert_sql( testing.db, lambda: q[5:], [ ( "SELECT users.id AS users_id, users.name " - "AS users_name FROM users LIMIT :param_1 OFFSET :param_2", - {'param_1': -1, 'param_2': 5})]) + "AS users_name FROM users LIMIT -1 OFFSET :param_1", + {'param_1': 5})]) self.assert_sql(testing.db, lambda: q[2:2], []) @@ -3213,3 +3215,96 @@ class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL): "SELECT x HAVING x = 1", dialect=self._dialect(False) ) + + +class SessionBindTest(QueryTest): + + @contextlib.contextmanager + def _assert_bind_args(self, session): + get_bind = mock.Mock(side_effect=session.get_bind) + with mock.patch.object(session, "get_bind", get_bind): + yield + for call_ in get_bind.mock_calls: + is_(call_[1][0], inspect(self.classes.User)) + is_not_(call_[2]['clause'], None) + + def test_single_entity_q(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User).all() + + def test_sql_expr_entity_q(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User.id).all() + + def test_count(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User).count() + + def test_aggregate_fn(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(func.max(User.name)).all() + + def test_bulk_update_no_sync(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User).filter(User.id == 15).update( + {"name": "foob"}, synchronize_session=False) + + def test_bulk_delete_no_sync(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User).filter(User.id == 15).delete( + synchronize_session=False) + + def test_bulk_update_fetch_sync(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User).filter(User.id == 15).update( + {"name": "foob"}, synchronize_session='fetch') + + def test_bulk_delete_fetch_sync(self): + User = self.classes.User + session = Session() + with self._assert_bind_args(session): + session.query(User).filter(User.id == 15).delete( + synchronize_session='fetch') + + def test_column_property(self): + User = self.classes.User + + mapper = inspect(User) + mapper.add_property( + "score", + column_property(func.coalesce(self.tables.users.c.name, None))) + session = Session() + with self._assert_bind_args(session): + session.query(func.max(User.score)).scalar() + + def test_column_property_select(self): + User = self.classes.User + Address = self.classes.Address + + mapper = inspect(User) + mapper.add_property( + "score", + column_property( + select([func.sum(Address.id)]). + where(Address.user_id == User.id).as_scalar() + ) + ) + session = Session() + + with self._assert_bind_args(session): + session.query(func.max(User.score)).scalar() + diff --git a/test/orm/test_session.py b/test/orm/test_session.py index 96728612d..2aa0cd3eb 100644 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@ -1364,6 +1364,9 @@ class DisposedStates(fixtures.MappedTest): def test_close(self): self._test_session().close() + def test_invalidate(self): + self._test_session().invalidate() + def test_expunge_all(self): self._test_session().expunge_all() @@ -1446,7 +1449,9 @@ class SessionInterface(fixtures.TestBase): raises_('refresh', user_arg) instance_methods = self._public_session_methods() \ - - self._class_methods + - self._class_methods - set([ + 'bulk_update_mappings', 'bulk_insert_mappings', + 'bulk_save_objects']) eq_(watchdog, instance_methods, watchdog.symmetric_difference(instance_methods)) diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py index ba31e4c7d..1d7e8e693 100644 --- a/test/orm/test_transaction.py +++ b/test/orm/test_transaction.py @@ -184,6 +184,23 @@ class SessionTransactionTest(FixtureTest): assert users.count().scalar() == 1 assert addresses.count().scalar() == 1 + @testing.requires.independent_connections + def test_invalidate(self): + User, users = self.classes.User, self.tables.users + mapper(User, users) + sess = Session() + u = User(name='u1') + sess.add(u) + sess.flush() + c1 = sess.connection(User) + + sess.invalidate() + assert c1.invalidated + + eq_(sess.query(User).all(), []) + c2 = sess.connection(User) + assert not c2.invalidated + def test_subtransaction_on_noautocommit(self): User, users = self.classes.User, self.tables.users diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py index 374a77237..cef71370d 100644 --- a/test/orm/test_unitofworkv2.py +++ b/test/orm/test_unitofworkv2.py @@ -3,13 +3,13 @@ from sqlalchemy import testing from sqlalchemy.testing import engines from sqlalchemy.testing.schema import Table, Column from test.orm import _fixtures -from sqlalchemy import exc -from sqlalchemy.testing import fixtures -from sqlalchemy import Integer, String, ForeignKey, func +from sqlalchemy import exc, util +from sqlalchemy.testing import fixtures, config +from sqlalchemy import Integer, String, ForeignKey, func, literal from sqlalchemy.orm import mapper, relationship, backref, \ create_session, unitofwork, attributes,\ Session, exc as orm_exc -from sqlalchemy.testing.mock import Mock +from sqlalchemy.testing.mock import Mock, patch from sqlalchemy.testing.assertsql import AllOf, CompiledSQL from sqlalchemy import event @@ -1473,6 +1473,96 @@ class BasicStaleChecksTest(fixtures.MappedTest): sess.flush ) + def test_update_single_missing_broken_multi_rowcount(self): + @util.memoized_property + def rowcount(self): + if len(self.context.compiled_parameters) > 1: + return -1 + else: + return self.context.rowcount + + with patch.object( + config.db.dialect, "supports_sane_multi_rowcount", False): + with patch( + "sqlalchemy.engine.result.ResultProxy.rowcount", + rowcount): + Parent, Child = self._fixture() + sess = Session() + p1 = Parent(id=1, data=2) + sess.add(p1) + sess.flush() + + sess.execute(self.tables.parent.delete()) + + p1.data = 3 + assert_raises_message( + orm_exc.StaleDataError, + "UPDATE statement on table 'parent' expected to " + "update 1 row\(s\); 0 were matched.", + sess.flush + ) + + def test_update_multi_missing_broken_multi_rowcount(self): + @util.memoized_property + def rowcount(self): + if len(self.context.compiled_parameters) > 1: + return -1 + else: + return self.context.rowcount + + with patch.object( + config.db.dialect, "supports_sane_multi_rowcount", False): + with patch( + "sqlalchemy.engine.result.ResultProxy.rowcount", + rowcount): + Parent, Child = self._fixture() + sess = Session() + p1 = Parent(id=1, data=2) + p2 = Parent(id=2, data=3) + sess.add_all([p1, p2]) + sess.flush() + + sess.execute(self.tables.parent.delete().where(Parent.id == 1)) + + p1.data = 3 + p2.data = 4 + sess.flush() # no exception + + # update occurred for remaining row + eq_( + sess.query(Parent.id, Parent.data).all(), + [(2, 4)] + ) + + def test_update_value_missing_broken_multi_rowcount(self): + @util.memoized_property + def rowcount(self): + if len(self.context.compiled_parameters) > 1: + return -1 + else: + return self.context.rowcount + + with patch.object( + config.db.dialect, "supports_sane_multi_rowcount", False): + with patch( + "sqlalchemy.engine.result.ResultProxy.rowcount", + rowcount): + Parent, Child = self._fixture() + sess = Session() + p1 = Parent(id=1, data=1) + sess.add(p1) + sess.flush() + + sess.execute(self.tables.parent.delete()) + + p1.data = literal(1) + assert_raises_message( + orm_exc.StaleDataError, + "UPDATE statement on table 'parent' expected to " + "update 1 row\(s\); 0 were matched.", + sess.flush + ) + @testing.requires.sane_multi_rowcount def test_delete_multi_missing_warning(self): Parent, Child = self._fixture() @@ -1544,6 +1634,7 @@ class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults): T(id=10, data='t10', def_='def3'), T(id=11, data='t11'), ]) + self.assert_sql_execution( testing.db, sess.flush, diff --git a/test/orm/test_versioning.py b/test/orm/test_versioning.py index 55ce586b5..8348cb588 100644 --- a/test/orm/test_versioning.py +++ b/test/orm/test_versioning.py @@ -1,7 +1,8 @@ import datetime import sqlalchemy as sa -from sqlalchemy.testing import engines +from sqlalchemy.testing import engines, config from sqlalchemy import testing +from sqlalchemy.testing.mock import patch from sqlalchemy import ( Integer, String, Date, ForeignKey, orm, exc, select, TypeDecorator) from sqlalchemy.testing.schema import Table, Column @@ -12,6 +13,7 @@ from sqlalchemy.testing import ( eq_, assert_raises, assert_raises_message, fixtures) from sqlalchemy.testing.assertsql import CompiledSQL import uuid +from sqlalchemy import util def make_uuid(): @@ -223,6 +225,30 @@ class VersioningTest(fixtures.MappedTest): s1.refresh(f1s1, lockmode='update_nowait') assert f1s1.version_id == f1s2.version_id + def test_update_multi_missing_broken_multi_rowcount(self): + @util.memoized_property + def rowcount(self): + if len(self.context.compiled_parameters) > 1: + return -1 + else: + return self.context.rowcount + + with patch.object( + config.db.dialect, "supports_sane_multi_rowcount", False): + with patch( + "sqlalchemy.engine.result.ResultProxy.rowcount", + rowcount): + + Foo = self.classes.Foo + s1 = self._fixture() + f1s1 = Foo(value='f1 value') + s1.add(f1s1) + s1.commit() + + f1s1.value = 'f2 value' + s1.flush() + eq_(f1s1.version_id, 2) + @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections def test_noversioncheck(self): |
