summaryrefslogtreecommitdiff
path: root/test/orm/test_session.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-10-06 12:10:01 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-10-06 12:10:01 -0400
commitb7b242dbb5709aab76d1a035db3d5a72d079740c (patch)
treedf7ea20bcfb2cc00d04fa64b1948feb370310af6 /test/orm/test_session.py
parent70be21311772c687105802850380050bfeb4bd82 (diff)
downloadsqlalchemy-b7b242dbb5709aab76d1a035db3d5a72d079740c.tar.gz
- break session tests into smaller suites. still some ambiguity what
some of these tests are testing
Diffstat (limited to 'test/orm/test_session.py')
-rw-r--r--test/orm/test_session.py856
1 files changed, 439 insertions, 417 deletions
diff --git a/test/orm/test_session.py b/test/orm/test_session.py
index b9cc41d28..db3d505c3 100644
--- a/test/orm/test_session.py
+++ b/test/orm/test_session.py
@@ -18,68 +18,9 @@ from sqlalchemy.testing import fixtures
from test.orm import _fixtures
from sqlalchemy import event, ForeignKey
-
-class SessionTest(_fixtures.FixtureTest):
+class BindTest(_fixtures.FixtureTest):
run_inserts = None
- def test_no_close_on_flush(self):
- """Flush() doesn't close a connection the session didn't open"""
-
- User, users = self.classes.User, self.tables.users
-
- c = testing.db.connect()
- c.execute("select * from users")
-
- mapper(User, users)
- s = create_session(bind=c)
- s.add(User(name='first'))
- s.flush()
- c.execute("select * from users")
-
- def test_close(self):
- """close() doesn't close a connection the session didn't open"""
-
- User, users = self.classes.User, self.tables.users
-
- c = testing.db.connect()
- c.execute("select * from users")
-
- mapper(User, users)
- s = create_session(bind=c)
- s.add(User(name='first'))
- s.flush()
- c.execute("select * from users")
- s.close()
- c.execute("select * from users")
-
- def test_object_session_raises(self):
- User = self.classes.User
-
- assert_raises(
- orm_exc.UnmappedInstanceError,
- object_session,
- object()
- )
-
- assert_raises(
- orm_exc.UnmappedInstanceError,
- object_session,
- User()
- )
-
- @testing.requires.sequences
- def test_sequence_execute(self):
- seq = Sequence("some_sequence")
- seq.create(testing.db)
- try:
- sess = create_session(bind=testing.db)
- eq_(sess.execute(seq), 1)
- finally:
- seq.drop(testing.db)
-
-
-
- @engines.close_open_connections
def test_mapped_binds(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
@@ -122,7 +63,6 @@ class SessionTest(_fixtures.FixtureTest):
sess.close()
- @engines.close_open_connections
def test_table_binds(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
@@ -163,7 +103,6 @@ class SessionTest(_fixtures.FixtureTest):
sess.close()
- @engines.close_open_connections
def test_bind_from_metadata(self):
users, User = self.tables.users, self.classes.User
@@ -179,6 +118,178 @@ class SessionTest(_fixtures.FixtureTest):
assert len(session.query(User).filter_by(name='Johnny').all()) == 0
session.close()
+ def test_bind_arguments(self):
+ users, Address, addresses, User = (self.tables.users,
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
+
+ mapper(User, users)
+ mapper(Address, addresses)
+
+ e1 = engines.testing_engine()
+ e2 = engines.testing_engine()
+ e3 = engines.testing_engine()
+
+ sess = Session(e3)
+ sess.bind_mapper(User, e1)
+ sess.bind_mapper(Address, e2)
+
+ assert sess.connection().engine is e3
+ assert sess.connection(bind=e1).engine is e1
+ assert sess.connection(mapper=Address, bind=e1).engine is e1
+ assert sess.connection(mapper=Address).engine is e2
+ assert sess.connection(clause=addresses.select()).engine is e2
+ assert sess.connection(mapper=User,
+ clause=addresses.select()).engine is e1
+ assert sess.connection(mapper=User,
+ clause=addresses.select(),
+ bind=e2).engine is e2
+
+ sess.close()
+
+ @engines.close_open_connections
+ def test_bound_connection(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ c = testing.db.connect()
+ sess = create_session(bind=c)
+ sess.begin()
+ transaction = sess.transaction
+ u = User(name='u1')
+ sess.add(u)
+ sess.flush()
+ assert transaction._connection_for_bind(testing.db) \
+ is transaction._connection_for_bind(c) is c
+
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'Session already has a Connection '
+ 'associated',
+ transaction._connection_for_bind,
+ testing.db.connect())
+ transaction.rollback()
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ def test_bound_connection_transactional(self):
+ User, users = self.classes.User, self.tables.users
+
+ mapper(User, users)
+ c = testing.db.connect()
+
+ sess = create_session(bind=c, autocommit=False)
+ u = User(name='u1')
+ sess.add(u)
+ sess.flush()
+ sess.close()
+ assert not c.in_transaction()
+ assert c.scalar("select count(1) from users") == 0
+
+ sess = create_session(bind=c, autocommit=False)
+ u = User(name='u2')
+ sess.add(u)
+ sess.flush()
+ sess.commit()
+ assert not c.in_transaction()
+ assert c.scalar("select count(1) from users") == 1
+ c.execute("delete from users")
+ assert c.scalar("select count(1) from users") == 0
+
+ c = testing.db.connect()
+
+ trans = c.begin()
+ sess = create_session(bind=c, autocommit=True)
+ u = User(name='u3')
+ sess.add(u)
+ sess.flush()
+ assert c.in_transaction()
+ trans.commit()
+ assert not c.in_transaction()
+ assert c.scalar("select count(1) from users") == 1
+
+class ExecutionTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ @testing.requires.sequences
+ def test_sequence_execute(self):
+ seq = Sequence("some_sequence")
+ seq.create(testing.db)
+ try:
+ sess = create_session(bind=testing.db)
+ eq_(sess.execute(seq), 1)
+ finally:
+ seq.drop(testing.db)
+
+ def test_textual_execute(self):
+ """test that Session.execute() converts to text()"""
+
+ users = self.tables.users
+
+
+ sess = create_session(bind=self.metadata.bind)
+ users.insert().execute(id=7, name='jack')
+
+ # use :bindparam style
+ eq_(sess.execute("select * from users where id=:id",
+ {'id':7}).fetchall(),
+ [(7, u'jack')])
+
+
+ # use :bindparam style
+ eq_(sess.scalar("select id from users where id=:id",
+ {'id':7}),
+ 7)
+
+ def test_parameter_execute(self):
+ users = self.tables.users
+ sess = Session(bind=testing.db)
+ sess.execute(users.insert(), [
+ {"id": 7, "name": "u7"},
+ {"id": 8, "name": "u8"}
+ ]
+ )
+ sess.execute(users.insert(), {"id": 9, "name": "u9"})
+ eq_(
+ sess.execute(sa.select([users.c.id]).\
+ order_by(users.c.id)).fetchall(),
+ [(7, ), (8, ), (9, )]
+ )
+
+
+class TransScopingTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ def test_no_close_on_flush(self):
+ """Flush() doesn't close a connection the session didn't open"""
+
+ User, users = self.classes.User, self.tables.users
+
+ c = testing.db.connect()
+ c.execute("select * from users")
+
+ mapper(User, users)
+ s = create_session(bind=c)
+ s.add(User(name='first'))
+ s.flush()
+ c.execute("select * from users")
+
+ def test_close(self):
+ """close() doesn't close a connection the session didn't open"""
+
+ User, users = self.classes.User, self.tables.users
+
+ c = testing.db.connect()
+ c.execute("select * from users")
+
+ mapper(User, users)
+ s = create_session(bind=c)
+ s.add(User(name='first'))
+ s.flush()
+ c.execute("select * from users")
+ s.close()
+ c.execute("select * from users")
+
@testing.requires.independent_connections
@engines.close_open_connections
def test_transaction(self):
@@ -201,45 +312,23 @@ class SessionTest(_fixtures.FixtureTest):
).scalar() == 1
sess.close()
- @testing.requires.independent_connections
- @engines.close_open_connections
- def test_autoflush(self):
- User, users = self.classes.User, self.tables.users
-
- bind = self.metadata.bind
- mapper(User, users)
- conn1 = bind.connect()
- conn2 = bind.connect()
-
- sess = create_session(bind=conn1, autocommit=False, autoflush=True)
- u = User()
- u.name='ed'
- sess.add(u)
- u2 = sess.query(User).filter_by(name='ed').one()
- assert u2 is u
- eq_(conn1.execute("select count(1) from users").scalar(), 1)
- eq_(conn2.execute("select count(1) from users").scalar(), 0)
- sess.commit()
- eq_(conn1.execute("select count(1) from users").scalar(), 1)
- eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
- sess.close()
+class SessionUtilTest(_fixtures.FixtureTest):
+ run_inserts = None
- @testing.requires.python26
- def test_with_no_autoflush(self):
- User, users = self.classes.User, self.tables.users
+ def test_object_session_raises(self):
+ User = self.classes.User
- mapper(User, users)
- sess = Session()
+ assert_raises(
+ orm_exc.UnmappedInstanceError,
+ object_session,
+ object()
+ )
- u = User()
- u.name = 'ed'
- sess.add(u)
- def go(obj):
- assert u not in sess.query(User).all()
- testing.run_as_contextmanager(sess.no_autoflush, go)
- assert u in sess.new
- assert u in sess.query(User).all()
- assert u not in sess.new
+ assert_raises(
+ orm_exc.UnmappedInstanceError,
+ object_session,
+ User()
+ )
def test_make_transient(self):
users, User = self.tables.users, self.classes.User
@@ -303,6 +392,51 @@ class SessionTest(_fixtures.FixtureTest):
make_transient(u1)
sess.rollback()
+class SessionStateTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+
+ @testing.requires.independent_connections
+ @engines.close_open_connections
+ def test_autoflush(self):
+ User, users = self.classes.User, self.tables.users
+
+ bind = self.metadata.bind
+ mapper(User, users)
+ conn1 = bind.connect()
+ conn2 = bind.connect()
+
+ sess = create_session(bind=conn1, autocommit=False, autoflush=True)
+ u = User()
+ u.name = 'ed'
+ sess.add(u)
+ u2 = sess.query(User).filter_by(name='ed').one()
+ assert u2 is u
+ eq_(conn1.execute("select count(1) from users").scalar(), 1)
+ eq_(conn2.execute("select count(1) from users").scalar(), 0)
+ sess.commit()
+ eq_(conn1.execute("select count(1) from users").scalar(), 1)
+ eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
+ sess.close()
+
+ @testing.requires.python26
+ def test_with_no_autoflush(self):
+ User, users = self.classes.User, self.tables.users
+
+ mapper(User, users)
+ sess = Session()
+
+ u = User()
+ u.name = 'ed'
+ sess.add(u)
+ def go(obj):
+ assert u not in sess.query(User).all()
+ testing.run_as_contextmanager(sess.no_autoflush, go)
+ assert u in sess.new
+ assert u in sess.query(User).all()
+ assert u not in sess.new
+
+
def test_deleted_flag(self):
users, User = self.tables.users, self.classes.User
@@ -431,131 +565,6 @@ class SessionTest(_fixtures.FixtureTest):
sess.rollback()
assert not sess.is_active
- def test_textual_execute(self):
- """test that Session.execute() converts to text()"""
-
- users = self.tables.users
-
-
- sess = create_session(bind=self.metadata.bind)
- users.insert().execute(id=7, name='jack')
-
- # use :bindparam style
- eq_(sess.execute("select * from users where id=:id",
- {'id':7}).fetchall(),
- [(7, u'jack')])
-
-
- # use :bindparam style
- eq_(sess.scalar("select id from users where id=:id",
- {'id':7}),
- 7)
-
- def test_parameter_execute(self):
- users = self.tables.users
- sess = Session(bind=testing.db)
- sess.execute(users.insert(), [
- {"id": 7, "name": "u7"},
- {"id": 8, "name": "u8"}
- ]
- )
- sess.execute(users.insert(), {"id": 9, "name": "u9"})
- eq_(
- sess.execute(sa.select([users.c.id]).\
- order_by(users.c.id)).fetchall(),
- [(7, ), (8, ), (9, )]
- )
-
-
- @engines.close_open_connections
- def test_bound_connection(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- c = testing.db.connect()
- sess = create_session(bind=c)
- sess.begin()
- transaction = sess.transaction
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- assert transaction._connection_for_bind(testing.db) \
- is transaction._connection_for_bind(c) is c
-
- assert_raises_message(sa.exc.InvalidRequestError,
- 'Session already has a Connection '
- 'associated',
- transaction._connection_for_bind,
- testing.db.connect())
- transaction.rollback()
- assert len(sess.query(User).all()) == 0
- sess.close()
-
- def test_bound_connection_transactional(self):
- User, users = self.classes.User, self.tables.users
-
- mapper(User, users)
- c = testing.db.connect()
-
- sess = create_session(bind=c, autocommit=False)
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- sess.close()
- assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 0
-
- sess = create_session(bind=c, autocommit=False)
- u = User(name='u2')
- sess.add(u)
- sess.flush()
- sess.commit()
- assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 1
- c.execute("delete from users")
- assert c.scalar("select count(1) from users") == 0
-
- c = testing.db.connect()
-
- trans = c.begin()
- sess = create_session(bind=c, autocommit=True)
- u = User(name='u3')
- sess.add(u)
- sess.flush()
- assert c.in_transaction()
- trans.commit()
- assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 1
-
- def test_bind_arguments(self):
- users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
-
- mapper(User, users)
- mapper(Address, addresses)
-
- e1 = engines.testing_engine()
- e2 = engines.testing_engine()
- e3 = engines.testing_engine()
-
- sess = Session(e3)
- sess.bind_mapper(User, e1)
- sess.bind_mapper(Address, e2)
-
- assert sess.connection().engine is e3
- assert sess.connection(bind=e1).engine is e1
- assert sess.connection(mapper=Address, bind=e1).engine is e1
- assert sess.connection(mapper=Address).engine is e2
- assert sess.connection(clause=addresses.select()).engine is e2
- assert sess.connection(mapper=User,
- clause=addresses.select()).engine is e1
- assert sess.connection(mapper=User,
- clause=addresses.select(),
- bind=e2).engine is e2
-
- sess.close()
@engines.close_open_connections
def test_add_delete(self):
@@ -617,6 +626,180 @@ class SessionTest(_fixtures.FixtureTest):
assert user not in s
assert s.query(User).count() == 0
+
+ @testing.uses_deprecated()
+ def test_identity_conflict(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ for s in (
+ create_session(),
+ create_session(weak_identity_map=False),
+ ):
+ users.delete().execute()
+ u1 = User(name="ed")
+ s.add(u1)
+ s.flush()
+ s.expunge(u1)
+ u2 = s.query(User).first()
+ s.expunge(u2)
+ s.identity_map.add(sa.orm.attributes.instance_state(u1))
+
+ assert_raises(AssertionError, s.identity_map.add,
+ sa.orm.attributes.instance_state(u2))
+
+ def test_pickled_update(self):
+ users, User = self.tables.users, pickleable.User
+
+ mapper(User, users)
+ sess1 = create_session()
+ sess2 = create_session()
+ u1 = User(name='u1')
+ sess1.add(u1)
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'already attached to session', sess2.add,
+ u1)
+ u2 = pickle.loads(pickle.dumps(u1))
+ sess2.add(u2)
+
+ def test_duplicate_update(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ Session = sessionmaker()
+ sess = Session()
+
+ u1 = User(name='u1')
+ sess.add(u1)
+ sess.flush()
+ assert u1.id is not None
+
+ sess.expunge(u1)
+
+ assert u1 not in sess
+ assert Session.object_session(u1) is None
+
+ u2 = sess.query(User).get(u1.id)
+ assert u2 is not None and u2 is not u1
+ assert u2 in sess
+
+ assert_raises(Exception, lambda: sess.add(u1))
+
+ sess.expunge(u2)
+ assert u2 not in sess
+ assert Session.object_session(u2) is None
+
+ u1.name = "John"
+ u2.name = "Doe"
+
+ sess.add(u1)
+ assert u1 in sess
+ assert Session.object_session(u1) is sess
+
+ sess.flush()
+
+ sess.expunge_all()
+
+ u3 = sess.query(User).get(u1.id)
+ assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
+
+ def test_no_double_save(self):
+ users = self.tables.users
+
+ sess = create_session()
+ class Foo(object):
+ def __init__(self):
+ sess.add(self)
+ class Bar(Foo):
+ def __init__(self):
+ sess.add(self)
+ Foo.__init__(self)
+ mapper(Foo, users)
+ mapper(Bar, users)
+
+ b = Bar()
+ assert b in sess
+ assert len(list(sess)) == 1
+
+ def test_identity_map_mutate(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ sess = Session()
+
+ sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
+ sess.commit()
+
+ # TODO: what are we testing here ? that iteritems() can
+ # withstand a change? should this be
+ # more directly attempting to manipulate the identity_map ?
+ u1, u2, u3 = sess.query(User).all()
+ for i, (key, value) in enumerate(sess.identity_map.iteritems()):
+ if i == 2:
+ del u3
+ gc_collect()
+
+class SessionStateWFixtureTest(_fixtures.FixtureTest):
+
+ def test_autoflush_rollback(self):
+ Address, addresses, users, User = (self.classes.Address,
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
+
+ mapper(Address, addresses)
+ mapper(User, users, properties={
+ 'addresses':relationship(Address)})
+
+ sess = create_session(autocommit=False, autoflush=True)
+ u = sess.query(User).get(8)
+ newad = Address(email_address='a new address')
+ u.addresses.append(newad)
+ u.name = 'some new name'
+ assert u.name == 'some new name'
+ assert len(u.addresses) == 4
+ assert newad in u.addresses
+ sess.rollback()
+ assert u.name == 'ed'
+ assert len(u.addresses) == 3
+
+ assert newad not in u.addresses
+ # pending objects dont get expired
+ assert newad.email_address == 'a new address'
+
+ def test_expunge_cascade(self):
+ Address, addresses, users, User = (self.classes.Address,
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
+
+ mapper(Address, addresses)
+ mapper(User, users, properties={
+ 'addresses':relationship(Address,
+ backref=backref("user", cascade="all"),
+ cascade="all")})
+
+ session = create_session()
+ u = session.query(User).filter_by(id=7).one()
+
+ # get everything to load in both directions
+ print [a.user for a in u.addresses]
+
+ # then see if expunge fails
+ session.expunge(u)
+
+ assert sa.orm.object_session(u) is None
+ assert sa.orm.attributes.instance_state(u).session_id is None
+ for a in u.addresses:
+ assert sa.orm.object_session(a) is None
+ assert sa.orm.attributes.instance_state(a).session_id is None
+
+
+
+class WeakIdentityMapTest(_fixtures.FixtureTest):
+ run_inserts = None
+
@testing.requires.predictable_gc
def test_weakref(self):
"""test the weak-referencing identity map, which strongly-
@@ -685,28 +868,6 @@ class SessionTest(_fixtures.FixtureTest):
assert not s.identity_map
- @testing.uses_deprecated()
- def test_identity_conflict(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- for s in (
- create_session(),
- create_session(weak_identity_map=False),
- ):
- users.delete().execute()
- u1 = User(name="ed")
- s.add(u1)
- s.flush()
- s.expunge(u1)
- u2 = s.query(User).first()
- s.expunge(u2)
- s.identity_map.add(sa.orm.attributes.instance_state(u1))
-
- assert_raises(AssertionError, s.identity_map.add,
- sa.orm.attributes.instance_state(u2))
-
-
@testing.requires.predictable_gc
def test_weakref_with_cycles_o2m(self):
Address, addresses, users, User = (self.classes.Address,
@@ -776,6 +937,39 @@ class SessionTest(_fixtures.FixtureTest):
user = s.query(User).options(joinedload(User.address)).one()
eq_(user, User(name="ed", address=Address(email_address="ed2")))
+ def test_auto_detach_on_gc_session(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ sess = Session()
+
+ u1 = User(name='u1')
+ sess.add(u1)
+ sess.commit()
+
+ # can't add u1 to Session,
+ # already belongs to u2
+ s2 = Session()
+ assert_raises_message(
+ sa.exc.InvalidRequestError,
+ r".*is already attached to session",
+ s2.add, u1
+ )
+
+ # garbage collect sess
+ del sess
+ gc_collect()
+
+ # s2 lets it in now despite u1 having
+ # session_key
+ s2.add(u1)
+ assert u1 in s2
+
+
+class StrongIdentityMapTest(_fixtures.FixtureTest):
+ run_inserts = None
+
@testing.uses_deprecated()
def test_strong_ref(self):
users, User = self.tables.users, self.classes.User
@@ -853,178 +1047,6 @@ class SessionTest(_fixtures.FixtureTest):
self.assert_(len(s.identity_map) == 0)
- def test_pickled_update(self):
- users, User = self.tables.users, pickleable.User
-
- mapper(User, users)
- sess1 = create_session()
- sess2 = create_session()
- u1 = User(name='u1')
- sess1.add(u1)
- assert_raises_message(sa.exc.InvalidRequestError,
- 'already attached to session', sess2.add,
- u1)
- u2 = pickle.loads(pickle.dumps(u1))
- sess2.add(u2)
-
- def test_duplicate_update(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- Session = sessionmaker()
- sess = Session()
-
- u1 = User(name='u1')
- sess.add(u1)
- sess.flush()
- assert u1.id is not None
-
- sess.expunge(u1)
-
- assert u1 not in sess
- assert Session.object_session(u1) is None
-
- u2 = sess.query(User).get(u1.id)
- assert u2 is not None and u2 is not u1
- assert u2 in sess
-
- assert_raises(Exception, lambda: sess.add(u1))
-
- sess.expunge(u2)
- assert u2 not in sess
- assert Session.object_session(u2) is None
-
- u1.name = "John"
- u2.name = "Doe"
-
- sess.add(u1)
- assert u1 in sess
- assert Session.object_session(u1) is sess
-
- sess.flush()
-
- sess.expunge_all()
-
- u3 = sess.query(User).get(u1.id)
- assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
-
- def test_no_double_save(self):
- users = self.tables.users
-
- sess = create_session()
- class Foo(object):
- def __init__(self):
- sess.add(self)
- class Bar(Foo):
- def __init__(self):
- sess.add(self)
- Foo.__init__(self)
- mapper(Foo, users)
- mapper(Bar, users)
-
- b = Bar()
- assert b in sess
- assert len(list(sess)) == 1
-
- def test_identity_map_mutate(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- sess = Session()
-
- sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
- sess.commit()
-
- u1, u2, u3 = sess.query(User).all()
- for i, (key, value) in enumerate(sess.identity_map.iteritems()):
- if i == 2:
- del u3
- gc_collect()
-
- def test_auto_detach_on_gc_session(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- sess = Session()
-
- u1 = User(name='u1')
- sess.add(u1)
- sess.commit()
-
- # can't add u1 to Session,
- # already belongs to u2
- s2 = Session()
- assert_raises_message(
- sa.exc.InvalidRequestError,
- r".*is already attached to session",
- s2.add, u1
- )
-
- # garbage collect sess
- del sess
- gc_collect()
-
- # s2 lets it in now despite u1 having
- # session_key
- s2.add(u1)
- assert u1 in s2
-
-class SessionDataTest(_fixtures.FixtureTest):
- def test_expunge_cascade(self):
- Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
- mapper(Address, addresses)
- mapper(User, users, properties={
- 'addresses':relationship(Address,
- backref=backref("user", cascade="all"),
- cascade="all")})
-
- session = create_session()
- u = session.query(User).filter_by(id=7).one()
-
- # get everything to load in both directions
- print [a.user for a in u.addresses]
-
- # then see if expunge fails
- session.expunge(u)
-
- assert sa.orm.object_session(u) is None
- assert sa.orm.attributes.instance_state(u).session_id is None
- for a in u.addresses:
- assert sa.orm.object_session(a) is None
- assert sa.orm.attributes.instance_state(a).session_id is None
-
- def test_autoflush_rollback(self):
- Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
- mapper(Address, addresses)
- mapper(User, users, properties={
- 'addresses':relationship(Address)})
-
- sess = create_session(autocommit=False, autoflush=True)
- u = sess.query(User).get(8)
- newad = Address(email_address='a new address')
- u.addresses.append(newad)
- u.name = 'some new name'
- assert u.name == 'some new name'
- assert len(u.addresses) == 4
- assert newad in u.addresses
- sess.rollback()
- assert u.name == 'ed'
- assert len(u.addresses) == 3
-
- assert newad not in u.addresses
- # pending objects dont get expired
- assert newad.email_address == 'a new address'
-
class IsModifiedTest(_fixtures.FixtureTest):
run_inserts = None