diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-10-06 12:10:01 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-10-06 12:10:01 -0400 |
| commit | b7b242dbb5709aab76d1a035db3d5a72d079740c (patch) | |
| tree | df7ea20bcfb2cc00d04fa64b1948feb370310af6 /test/orm/test_session.py | |
| parent | 70be21311772c687105802850380050bfeb4bd82 (diff) | |
| download | sqlalchemy-b7b242dbb5709aab76d1a035db3d5a72d079740c.tar.gz | |
- break session tests into smaller suites. still some ambiguity what
some of these tests are testing
Diffstat (limited to 'test/orm/test_session.py')
| -rw-r--r-- | test/orm/test_session.py | 856 |
1 files changed, 439 insertions, 417 deletions
diff --git a/test/orm/test_session.py b/test/orm/test_session.py index b9cc41d28..db3d505c3 100644 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@ -18,68 +18,9 @@ from sqlalchemy.testing import fixtures from test.orm import _fixtures from sqlalchemy import event, ForeignKey - -class SessionTest(_fixtures.FixtureTest): +class BindTest(_fixtures.FixtureTest): run_inserts = None - def test_no_close_on_flush(self): - """Flush() doesn't close a connection the session didn't open""" - - User, users = self.classes.User, self.tables.users - - c = testing.db.connect() - c.execute("select * from users") - - mapper(User, users) - s = create_session(bind=c) - s.add(User(name='first')) - s.flush() - c.execute("select * from users") - - def test_close(self): - """close() doesn't close a connection the session didn't open""" - - User, users = self.classes.User, self.tables.users - - c = testing.db.connect() - c.execute("select * from users") - - mapper(User, users) - s = create_session(bind=c) - s.add(User(name='first')) - s.flush() - c.execute("select * from users") - s.close() - c.execute("select * from users") - - def test_object_session_raises(self): - User = self.classes.User - - assert_raises( - orm_exc.UnmappedInstanceError, - object_session, - object() - ) - - assert_raises( - orm_exc.UnmappedInstanceError, - object_session, - User() - ) - - @testing.requires.sequences - def test_sequence_execute(self): - seq = Sequence("some_sequence") - seq.create(testing.db) - try: - sess = create_session(bind=testing.db) - eq_(sess.execute(seq), 1) - finally: - seq.drop(testing.db) - - - - @engines.close_open_connections def test_mapped_binds(self): Address, addresses, users, User = (self.classes.Address, self.tables.addresses, @@ -122,7 +63,6 @@ class SessionTest(_fixtures.FixtureTest): sess.close() - @engines.close_open_connections def test_table_binds(self): Address, addresses, users, User = (self.classes.Address, self.tables.addresses, @@ -163,7 +103,6 @@ class SessionTest(_fixtures.FixtureTest): sess.close() - @engines.close_open_connections def test_bind_from_metadata(self): users, User = self.tables.users, self.classes.User @@ -179,6 +118,178 @@ class SessionTest(_fixtures.FixtureTest): assert len(session.query(User).filter_by(name='Johnny').all()) == 0 session.close() + def test_bind_arguments(self): + users, Address, addresses, User = (self.tables.users, + self.classes.Address, + self.tables.addresses, + self.classes.User) + + mapper(User, users) + mapper(Address, addresses) + + e1 = engines.testing_engine() + e2 = engines.testing_engine() + e3 = engines.testing_engine() + + sess = Session(e3) + sess.bind_mapper(User, e1) + sess.bind_mapper(Address, e2) + + assert sess.connection().engine is e3 + assert sess.connection(bind=e1).engine is e1 + assert sess.connection(mapper=Address, bind=e1).engine is e1 + assert sess.connection(mapper=Address).engine is e2 + assert sess.connection(clause=addresses.select()).engine is e2 + assert sess.connection(mapper=User, + clause=addresses.select()).engine is e1 + assert sess.connection(mapper=User, + clause=addresses.select(), + bind=e2).engine is e2 + + sess.close() + + @engines.close_open_connections + def test_bound_connection(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + c = testing.db.connect() + sess = create_session(bind=c) + sess.begin() + transaction = sess.transaction + u = User(name='u1') + sess.add(u) + sess.flush() + assert transaction._connection_for_bind(testing.db) \ + is transaction._connection_for_bind(c) is c + + assert_raises_message(sa.exc.InvalidRequestError, + 'Session already has a Connection ' + 'associated', + transaction._connection_for_bind, + testing.db.connect()) + transaction.rollback() + assert len(sess.query(User).all()) == 0 + sess.close() + + def test_bound_connection_transactional(self): + User, users = self.classes.User, self.tables.users + + mapper(User, users) + c = testing.db.connect() + + sess = create_session(bind=c, autocommit=False) + u = User(name='u1') + sess.add(u) + sess.flush() + sess.close() + assert not c.in_transaction() + assert c.scalar("select count(1) from users") == 0 + + sess = create_session(bind=c, autocommit=False) + u = User(name='u2') + sess.add(u) + sess.flush() + sess.commit() + assert not c.in_transaction() + assert c.scalar("select count(1) from users") == 1 + c.execute("delete from users") + assert c.scalar("select count(1) from users") == 0 + + c = testing.db.connect() + + trans = c.begin() + sess = create_session(bind=c, autocommit=True) + u = User(name='u3') + sess.add(u) + sess.flush() + assert c.in_transaction() + trans.commit() + assert not c.in_transaction() + assert c.scalar("select count(1) from users") == 1 + +class ExecutionTest(_fixtures.FixtureTest): + run_inserts = None + + @testing.requires.sequences + def test_sequence_execute(self): + seq = Sequence("some_sequence") + seq.create(testing.db) + try: + sess = create_session(bind=testing.db) + eq_(sess.execute(seq), 1) + finally: + seq.drop(testing.db) + + def test_textual_execute(self): + """test that Session.execute() converts to text()""" + + users = self.tables.users + + + sess = create_session(bind=self.metadata.bind) + users.insert().execute(id=7, name='jack') + + # use :bindparam style + eq_(sess.execute("select * from users where id=:id", + {'id':7}).fetchall(), + [(7, u'jack')]) + + + # use :bindparam style + eq_(sess.scalar("select id from users where id=:id", + {'id':7}), + 7) + + def test_parameter_execute(self): + users = self.tables.users + sess = Session(bind=testing.db) + sess.execute(users.insert(), [ + {"id": 7, "name": "u7"}, + {"id": 8, "name": "u8"} + ] + ) + sess.execute(users.insert(), {"id": 9, "name": "u9"}) + eq_( + sess.execute(sa.select([users.c.id]).\ + order_by(users.c.id)).fetchall(), + [(7, ), (8, ), (9, )] + ) + + +class TransScopingTest(_fixtures.FixtureTest): + run_inserts = None + + def test_no_close_on_flush(self): + """Flush() doesn't close a connection the session didn't open""" + + User, users = self.classes.User, self.tables.users + + c = testing.db.connect() + c.execute("select * from users") + + mapper(User, users) + s = create_session(bind=c) + s.add(User(name='first')) + s.flush() + c.execute("select * from users") + + def test_close(self): + """close() doesn't close a connection the session didn't open""" + + User, users = self.classes.User, self.tables.users + + c = testing.db.connect() + c.execute("select * from users") + + mapper(User, users) + s = create_session(bind=c) + s.add(User(name='first')) + s.flush() + c.execute("select * from users") + s.close() + c.execute("select * from users") + @testing.requires.independent_connections @engines.close_open_connections def test_transaction(self): @@ -201,45 +312,23 @@ class SessionTest(_fixtures.FixtureTest): ).scalar() == 1 sess.close() - @testing.requires.independent_connections - @engines.close_open_connections - def test_autoflush(self): - User, users = self.classes.User, self.tables.users - - bind = self.metadata.bind - mapper(User, users) - conn1 = bind.connect() - conn2 = bind.connect() - - sess = create_session(bind=conn1, autocommit=False, autoflush=True) - u = User() - u.name='ed' - sess.add(u) - u2 = sess.query(User).filter_by(name='ed').one() - assert u2 is u - eq_(conn1.execute("select count(1) from users").scalar(), 1) - eq_(conn2.execute("select count(1) from users").scalar(), 0) - sess.commit() - eq_(conn1.execute("select count(1) from users").scalar(), 1) - eq_(bind.connect().execute("select count(1) from users").scalar(), 1) - sess.close() +class SessionUtilTest(_fixtures.FixtureTest): + run_inserts = None - @testing.requires.python26 - def test_with_no_autoflush(self): - User, users = self.classes.User, self.tables.users + def test_object_session_raises(self): + User = self.classes.User - mapper(User, users) - sess = Session() + assert_raises( + orm_exc.UnmappedInstanceError, + object_session, + object() + ) - u = User() - u.name = 'ed' - sess.add(u) - def go(obj): - assert u not in sess.query(User).all() - testing.run_as_contextmanager(sess.no_autoflush, go) - assert u in sess.new - assert u in sess.query(User).all() - assert u not in sess.new + assert_raises( + orm_exc.UnmappedInstanceError, + object_session, + User() + ) def test_make_transient(self): users, User = self.tables.users, self.classes.User @@ -303,6 +392,51 @@ class SessionTest(_fixtures.FixtureTest): make_transient(u1) sess.rollback() +class SessionStateTest(_fixtures.FixtureTest): + run_inserts = None + + + @testing.requires.independent_connections + @engines.close_open_connections + def test_autoflush(self): + User, users = self.classes.User, self.tables.users + + bind = self.metadata.bind + mapper(User, users) + conn1 = bind.connect() + conn2 = bind.connect() + + sess = create_session(bind=conn1, autocommit=False, autoflush=True) + u = User() + u.name = 'ed' + sess.add(u) + u2 = sess.query(User).filter_by(name='ed').one() + assert u2 is u + eq_(conn1.execute("select count(1) from users").scalar(), 1) + eq_(conn2.execute("select count(1) from users").scalar(), 0) + sess.commit() + eq_(conn1.execute("select count(1) from users").scalar(), 1) + eq_(bind.connect().execute("select count(1) from users").scalar(), 1) + sess.close() + + @testing.requires.python26 + def test_with_no_autoflush(self): + User, users = self.classes.User, self.tables.users + + mapper(User, users) + sess = Session() + + u = User() + u.name = 'ed' + sess.add(u) + def go(obj): + assert u not in sess.query(User).all() + testing.run_as_contextmanager(sess.no_autoflush, go) + assert u in sess.new + assert u in sess.query(User).all() + assert u not in sess.new + + def test_deleted_flag(self): users, User = self.tables.users, self.classes.User @@ -431,131 +565,6 @@ class SessionTest(_fixtures.FixtureTest): sess.rollback() assert not sess.is_active - def test_textual_execute(self): - """test that Session.execute() converts to text()""" - - users = self.tables.users - - - sess = create_session(bind=self.metadata.bind) - users.insert().execute(id=7, name='jack') - - # use :bindparam style - eq_(sess.execute("select * from users where id=:id", - {'id':7}).fetchall(), - [(7, u'jack')]) - - - # use :bindparam style - eq_(sess.scalar("select id from users where id=:id", - {'id':7}), - 7) - - def test_parameter_execute(self): - users = self.tables.users - sess = Session(bind=testing.db) - sess.execute(users.insert(), [ - {"id": 7, "name": "u7"}, - {"id": 8, "name": "u8"} - ] - ) - sess.execute(users.insert(), {"id": 9, "name": "u9"}) - eq_( - sess.execute(sa.select([users.c.id]).\ - order_by(users.c.id)).fetchall(), - [(7, ), (8, ), (9, )] - ) - - - @engines.close_open_connections - def test_bound_connection(self): - users, User = self.tables.users, self.classes.User - - mapper(User, users) - c = testing.db.connect() - sess = create_session(bind=c) - sess.begin() - transaction = sess.transaction - u = User(name='u1') - sess.add(u) - sess.flush() - assert transaction._connection_for_bind(testing.db) \ - is transaction._connection_for_bind(c) is c - - assert_raises_message(sa.exc.InvalidRequestError, - 'Session already has a Connection ' - 'associated', - transaction._connection_for_bind, - testing.db.connect()) - transaction.rollback() - assert len(sess.query(User).all()) == 0 - sess.close() - - def test_bound_connection_transactional(self): - User, users = self.classes.User, self.tables.users - - mapper(User, users) - c = testing.db.connect() - - sess = create_session(bind=c, autocommit=False) - u = User(name='u1') - sess.add(u) - sess.flush() - sess.close() - assert not c.in_transaction() - assert c.scalar("select count(1) from users") == 0 - - sess = create_session(bind=c, autocommit=False) - u = User(name='u2') - sess.add(u) - sess.flush() - sess.commit() - assert not c.in_transaction() - assert c.scalar("select count(1) from users") == 1 - c.execute("delete from users") - assert c.scalar("select count(1) from users") == 0 - - c = testing.db.connect() - - trans = c.begin() - sess = create_session(bind=c, autocommit=True) - u = User(name='u3') - sess.add(u) - sess.flush() - assert c.in_transaction() - trans.commit() - assert not c.in_transaction() - assert c.scalar("select count(1) from users") == 1 - - def test_bind_arguments(self): - users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) - - mapper(User, users) - mapper(Address, addresses) - - e1 = engines.testing_engine() - e2 = engines.testing_engine() - e3 = engines.testing_engine() - - sess = Session(e3) - sess.bind_mapper(User, e1) - sess.bind_mapper(Address, e2) - - assert sess.connection().engine is e3 - assert sess.connection(bind=e1).engine is e1 - assert sess.connection(mapper=Address, bind=e1).engine is e1 - assert sess.connection(mapper=Address).engine is e2 - assert sess.connection(clause=addresses.select()).engine is e2 - assert sess.connection(mapper=User, - clause=addresses.select()).engine is e1 - assert sess.connection(mapper=User, - clause=addresses.select(), - bind=e2).engine is e2 - - sess.close() @engines.close_open_connections def test_add_delete(self): @@ -617,6 +626,180 @@ class SessionTest(_fixtures.FixtureTest): assert user not in s assert s.query(User).count() == 0 + + @testing.uses_deprecated() + def test_identity_conflict(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + for s in ( + create_session(), + create_session(weak_identity_map=False), + ): + users.delete().execute() + u1 = User(name="ed") + s.add(u1) + s.flush() + s.expunge(u1) + u2 = s.query(User).first() + s.expunge(u2) + s.identity_map.add(sa.orm.attributes.instance_state(u1)) + + assert_raises(AssertionError, s.identity_map.add, + sa.orm.attributes.instance_state(u2)) + + def test_pickled_update(self): + users, User = self.tables.users, pickleable.User + + mapper(User, users) + sess1 = create_session() + sess2 = create_session() + u1 = User(name='u1') + sess1.add(u1) + assert_raises_message(sa.exc.InvalidRequestError, + 'already attached to session', sess2.add, + u1) + u2 = pickle.loads(pickle.dumps(u1)) + sess2.add(u2) + + def test_duplicate_update(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + Session = sessionmaker() + sess = Session() + + u1 = User(name='u1') + sess.add(u1) + sess.flush() + assert u1.id is not None + + sess.expunge(u1) + + assert u1 not in sess + assert Session.object_session(u1) is None + + u2 = sess.query(User).get(u1.id) + assert u2 is not None and u2 is not u1 + assert u2 in sess + + assert_raises(Exception, lambda: sess.add(u1)) + + sess.expunge(u2) + assert u2 not in sess + assert Session.object_session(u2) is None + + u1.name = "John" + u2.name = "Doe" + + sess.add(u1) + assert u1 in sess + assert Session.object_session(u1) is sess + + sess.flush() + + sess.expunge_all() + + u3 = sess.query(User).get(u1.id) + assert u3 is not u1 and u3 is not u2 and u3.name == u1.name + + def test_no_double_save(self): + users = self.tables.users + + sess = create_session() + class Foo(object): + def __init__(self): + sess.add(self) + class Bar(Foo): + def __init__(self): + sess.add(self) + Foo.__init__(self) + mapper(Foo, users) + mapper(Bar, users) + + b = Bar() + assert b in sess + assert len(list(sess)) == 1 + + def test_identity_map_mutate(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + + sess = Session() + + sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')]) + sess.commit() + + # TODO: what are we testing here ? that iteritems() can + # withstand a change? should this be + # more directly attempting to manipulate the identity_map ? + u1, u2, u3 = sess.query(User).all() + for i, (key, value) in enumerate(sess.identity_map.iteritems()): + if i == 2: + del u3 + gc_collect() + +class SessionStateWFixtureTest(_fixtures.FixtureTest): + + def test_autoflush_rollback(self): + Address, addresses, users, User = (self.classes.Address, + self.tables.addresses, + self.tables.users, + self.classes.User) + + mapper(Address, addresses) + mapper(User, users, properties={ + 'addresses':relationship(Address)}) + + sess = create_session(autocommit=False, autoflush=True) + u = sess.query(User).get(8) + newad = Address(email_address='a new address') + u.addresses.append(newad) + u.name = 'some new name' + assert u.name == 'some new name' + assert len(u.addresses) == 4 + assert newad in u.addresses + sess.rollback() + assert u.name == 'ed' + assert len(u.addresses) == 3 + + assert newad not in u.addresses + # pending objects dont get expired + assert newad.email_address == 'a new address' + + def test_expunge_cascade(self): + Address, addresses, users, User = (self.classes.Address, + self.tables.addresses, + self.tables.users, + self.classes.User) + + mapper(Address, addresses) + mapper(User, users, properties={ + 'addresses':relationship(Address, + backref=backref("user", cascade="all"), + cascade="all")}) + + session = create_session() + u = session.query(User).filter_by(id=7).one() + + # get everything to load in both directions + print [a.user for a in u.addresses] + + # then see if expunge fails + session.expunge(u) + + assert sa.orm.object_session(u) is None + assert sa.orm.attributes.instance_state(u).session_id is None + for a in u.addresses: + assert sa.orm.object_session(a) is None + assert sa.orm.attributes.instance_state(a).session_id is None + + + +class WeakIdentityMapTest(_fixtures.FixtureTest): + run_inserts = None + @testing.requires.predictable_gc def test_weakref(self): """test the weak-referencing identity map, which strongly- @@ -685,28 +868,6 @@ class SessionTest(_fixtures.FixtureTest): assert not s.identity_map - @testing.uses_deprecated() - def test_identity_conflict(self): - users, User = self.tables.users, self.classes.User - - mapper(User, users) - for s in ( - create_session(), - create_session(weak_identity_map=False), - ): - users.delete().execute() - u1 = User(name="ed") - s.add(u1) - s.flush() - s.expunge(u1) - u2 = s.query(User).first() - s.expunge(u2) - s.identity_map.add(sa.orm.attributes.instance_state(u1)) - - assert_raises(AssertionError, s.identity_map.add, - sa.orm.attributes.instance_state(u2)) - - @testing.requires.predictable_gc def test_weakref_with_cycles_o2m(self): Address, addresses, users, User = (self.classes.Address, @@ -776,6 +937,39 @@ class SessionTest(_fixtures.FixtureTest): user = s.query(User).options(joinedload(User.address)).one() eq_(user, User(name="ed", address=Address(email_address="ed2"))) + def test_auto_detach_on_gc_session(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + + sess = Session() + + u1 = User(name='u1') + sess.add(u1) + sess.commit() + + # can't add u1 to Session, + # already belongs to u2 + s2 = Session() + assert_raises_message( + sa.exc.InvalidRequestError, + r".*is already attached to session", + s2.add, u1 + ) + + # garbage collect sess + del sess + gc_collect() + + # s2 lets it in now despite u1 having + # session_key + s2.add(u1) + assert u1 in s2 + + +class StrongIdentityMapTest(_fixtures.FixtureTest): + run_inserts = None + @testing.uses_deprecated() def test_strong_ref(self): users, User = self.tables.users, self.classes.User @@ -853,178 +1047,6 @@ class SessionTest(_fixtures.FixtureTest): self.assert_(len(s.identity_map) == 0) - def test_pickled_update(self): - users, User = self.tables.users, pickleable.User - - mapper(User, users) - sess1 = create_session() - sess2 = create_session() - u1 = User(name='u1') - sess1.add(u1) - assert_raises_message(sa.exc.InvalidRequestError, - 'already attached to session', sess2.add, - u1) - u2 = pickle.loads(pickle.dumps(u1)) - sess2.add(u2) - - def test_duplicate_update(self): - users, User = self.tables.users, self.classes.User - - mapper(User, users) - Session = sessionmaker() - sess = Session() - - u1 = User(name='u1') - sess.add(u1) - sess.flush() - assert u1.id is not None - - sess.expunge(u1) - - assert u1 not in sess - assert Session.object_session(u1) is None - - u2 = sess.query(User).get(u1.id) - assert u2 is not None and u2 is not u1 - assert u2 in sess - - assert_raises(Exception, lambda: sess.add(u1)) - - sess.expunge(u2) - assert u2 not in sess - assert Session.object_session(u2) is None - - u1.name = "John" - u2.name = "Doe" - - sess.add(u1) - assert u1 in sess - assert Session.object_session(u1) is sess - - sess.flush() - - sess.expunge_all() - - u3 = sess.query(User).get(u1.id) - assert u3 is not u1 and u3 is not u2 and u3.name == u1.name - - def test_no_double_save(self): - users = self.tables.users - - sess = create_session() - class Foo(object): - def __init__(self): - sess.add(self) - class Bar(Foo): - def __init__(self): - sess.add(self) - Foo.__init__(self) - mapper(Foo, users) - mapper(Bar, users) - - b = Bar() - assert b in sess - assert len(list(sess)) == 1 - - def test_identity_map_mutate(self): - users, User = self.tables.users, self.classes.User - - mapper(User, users) - - sess = Session() - - sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')]) - sess.commit() - - u1, u2, u3 = sess.query(User).all() - for i, (key, value) in enumerate(sess.identity_map.iteritems()): - if i == 2: - del u3 - gc_collect() - - def test_auto_detach_on_gc_session(self): - users, User = self.tables.users, self.classes.User - - mapper(User, users) - - sess = Session() - - u1 = User(name='u1') - sess.add(u1) - sess.commit() - - # can't add u1 to Session, - # already belongs to u2 - s2 = Session() - assert_raises_message( - sa.exc.InvalidRequestError, - r".*is already attached to session", - s2.add, u1 - ) - - # garbage collect sess - del sess - gc_collect() - - # s2 lets it in now despite u1 having - # session_key - s2.add(u1) - assert u1 in s2 - -class SessionDataTest(_fixtures.FixtureTest): - def test_expunge_cascade(self): - Address, addresses, users, User = (self.classes.Address, - self.tables.addresses, - self.tables.users, - self.classes.User) - - mapper(Address, addresses) - mapper(User, users, properties={ - 'addresses':relationship(Address, - backref=backref("user", cascade="all"), - cascade="all")}) - - session = create_session() - u = session.query(User).filter_by(id=7).one() - - # get everything to load in both directions - print [a.user for a in u.addresses] - - # then see if expunge fails - session.expunge(u) - - assert sa.orm.object_session(u) is None - assert sa.orm.attributes.instance_state(u).session_id is None - for a in u.addresses: - assert sa.orm.object_session(a) is None - assert sa.orm.attributes.instance_state(a).session_id is None - - def test_autoflush_rollback(self): - Address, addresses, users, User = (self.classes.Address, - self.tables.addresses, - self.tables.users, - self.classes.User) - - mapper(Address, addresses) - mapper(User, users, properties={ - 'addresses':relationship(Address)}) - - sess = create_session(autocommit=False, autoflush=True) - u = sess.query(User).get(8) - newad = Address(email_address='a new address') - u.addresses.append(newad) - u.name = 'some new name' - assert u.name == 'some new name' - assert len(u.addresses) == 4 - assert newad in u.addresses - sess.rollback() - assert u.name == 'ed' - assert len(u.addresses) == 3 - - assert newad not in u.addresses - # pending objects dont get expired - assert newad.email_address == 'a new address' - class IsModifiedTest(_fixtures.FixtureTest): run_inserts = None |
