summaryrefslogtreecommitdiff
path: root/test/orm
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-11-18 02:13:56 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-11-18 02:13:56 +0000
commit622a26a6551a3580c844b634519ab963c7f35aaf (patch)
tree24f6a07624805a26a8b7ae689df9181d41d19067 /test/orm
parente076169d390df8a9e90aa46053db34fd5815598a (diff)
downloadsqlalchemy-622a26a6551a3580c844b634519ab963c7f35aaf.tar.gz
- session.refresh() and session.expire() now support an additional argument
"attribute_names", a list of individual attribute keynames to be refreshed or expired, allowing partial reloads of attributes on an already-loaded instance. - finally simplified the behavior of deferred attributes, deferred polymorphic load, session.refresh, session.expire, mapper._postfetch to all use a single codepath through query._get(), which now supports a list of individual attribute names to be refreshed. the *one* exception still remaining is mapper._get_poly_select_loader(), which may stay that way since its inline with an already processing load operation. otherwise, query._get() is the single place that all "load this instance's row" operation proceeds. - cleanup all over the place
Diffstat (limited to 'test/orm')
-rw-r--r--test/orm/alltests.py1
-rw-r--r--test/orm/attributes.py40
-rw-r--r--test/orm/dynamic.py8
-rw-r--r--test/orm/eager_relations.py5
-rw-r--r--test/orm/expire.py439
-rw-r--r--test/orm/inheritance/basic.py8
-rw-r--r--test/orm/lazy_relations.py15
-rw-r--r--test/orm/mapper.py133
-rw-r--r--test/orm/query.py7
-rw-r--r--test/orm/unitofwork.py2
10 files changed, 492 insertions, 166 deletions
diff --git a/test/orm/alltests.py b/test/orm/alltests.py
index 59357c7b7..059d7a100 100644
--- a/test/orm/alltests.py
+++ b/test/orm/alltests.py
@@ -11,6 +11,7 @@ def suite():
'orm.lazy_relations',
'orm.eager_relations',
'orm.mapper',
+ 'orm.expire',
'orm.selectable',
'orm.collection',
'orm.generative',
diff --git a/test/orm/attributes.py b/test/orm/attributes.py
index 930bfa57e..2080474ed 100644
--- a/test/orm/attributes.py
+++ b/test/orm/attributes.py
@@ -5,6 +5,8 @@ from sqlalchemy.orm.collections import collection
from sqlalchemy import exceptions
from testlib import *
+ROLLBACK_SUPPORTED=False
+
# these test classes defined at the module
# level to support pickling
class MyTest(object):pass
@@ -29,7 +31,7 @@ class AttributesTest(PersistTest):
print repr(u.__dict__)
self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')
- manager.commit(u)
+ u._state.commit_all()
print repr(u.__dict__)
self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')
@@ -37,10 +39,11 @@ class AttributesTest(PersistTest):
u.email_address = 'foo@bar.com'
print repr(u.__dict__)
self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.email_address == 'foo@bar.com')
-
- manager.rollback(u)
- print repr(u.__dict__)
- self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')
+
+ if ROLLBACK_SUPPORTED:
+ manager.rollback(u)
+ print repr(u.__dict__)
+ self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')
def test_pickleness(self):
@@ -128,7 +131,7 @@ class AttributesTest(PersistTest):
print repr(u.__dict__)
self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')
- manager.commit(u, a)
+ u, a._state.commit_all()
print repr(u.__dict__)
self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')
@@ -140,11 +143,12 @@ class AttributesTest(PersistTest):
print repr(u.__dict__)
self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.addresses[0].email_address == 'lala@123.com' and u.addresses[1].email_address == 'foo@bar.com')
- manager.rollback(u, a)
- print repr(u.__dict__)
- print repr(u.addresses[0].__dict__)
- self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')
- self.assert_(len(manager.get_history(u, 'addresses').unchanged_items()) == 1)
+ if ROLLBACK_SUPPORTED:
+ manager.rollback(u, a)
+ print repr(u.__dict__)
+ print repr(u.addresses[0].__dict__)
+ self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')
+ self.assert_(len(manager.get_history(u, 'addresses').unchanged_items()) == 1)
def test_backref(self):
class Student(object):pass
@@ -231,9 +235,9 @@ class AttributesTest(PersistTest):
# create objects as if they'd been freshly loaded from the database (without history)
b = Blog()
p1 = Post()
- manager.init_instance_attribute(b, 'posts', lambda:[p1])
- manager.init_instance_attribute(p1, 'blog', lambda:b)
- manager.commit(p1, b)
+ b._state.set_callable('posts', lambda:[p1])
+ p1._state.set_callable('blog', lambda:b)
+ p1, b._state.commit_all()
# no orphans (called before the lazy loaders fire off)
assert manager.has_parent(Blog, p1, 'posts', optimistic=True)
@@ -292,7 +296,7 @@ class AttributesTest(PersistTest):
x.element = 'this is the element'
hist = manager.get_history(x, 'element')
assert hist.added_items() == ['this is the element']
- manager.commit(x)
+ x._state.commit_all()
hist = manager.get_history(x, 'element')
assert hist.added_items() == []
assert hist.unchanged_items() == ['this is the element']
@@ -320,7 +324,7 @@ class AttributesTest(PersistTest):
manager.register_attribute(Bar, 'id', uselist=False, useobject=True)
x = Foo()
- manager.commit(x)
+ x._state.commit_all()
x.col2.append(Bar(4))
h = manager.get_history(x, 'col2')
print h.added_items()
@@ -362,7 +366,7 @@ class AttributesTest(PersistTest):
manager.register_attribute(Foo, 'element', uselist=False, copy_function=lambda x:[y for y in x], mutable_scalars=True, useobject=False)
x = Foo()
x.element = ['one', 'two', 'three']
- manager.commit(x)
+ x._state.commit_all()
x.element[1] = 'five'
assert manager.is_modified(x)
@@ -372,7 +376,7 @@ class AttributesTest(PersistTest):
manager.register_attribute(Foo, 'element', uselist=False, useobject=False)
x = Foo()
x.element = ['one', 'two', 'three']
- manager.commit(x)
+ x._state.commit_all()
x.element[1] = 'five'
assert not manager.is_modified(x)
diff --git a/test/orm/dynamic.py b/test/orm/dynamic.py
index 80e3982da..0bb253b19 100644
--- a/test/orm/dynamic.py
+++ b/test/orm/dynamic.py
@@ -7,12 +7,10 @@ from testlib.fixtures import *
from query import QueryTest
-class DynamicTest(QueryTest):
+class DynamicTest(FixtureTest):
keep_mappers = False
-
- def setup_mappers(self):
- pass
-
+ keep_data = True
+
def test_basic(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses))
diff --git a/test/orm/eager_relations.py b/test/orm/eager_relations.py
index 3e811b86b..52602ecae 100644
--- a/test/orm/eager_relations.py
+++ b/test/orm/eager_relations.py
@@ -7,9 +7,10 @@ from testlib import *
from testlib.fixtures import *
from query import QueryTest
-class EagerTest(QueryTest):
+class EagerTest(FixtureTest):
keep_mappers = False
-
+ keep_data = True
+
def setup_mappers(self):
pass
diff --git a/test/orm/expire.py b/test/orm/expire.py
new file mode 100644
index 000000000..430117725
--- /dev/null
+++ b/test/orm/expire.py
@@ -0,0 +1,439 @@
+"""test attribute/instance expiration, deferral of attributes, etc."""
+
+import testbase
+from sqlalchemy import *
+from sqlalchemy import exceptions
+from sqlalchemy.orm import *
+from testlib import *
+from testlib.fixtures import *
+
+class ExpireTest(FixtureTest):
+ keep_mappers = False
+ refresh_data = True
+
+ def test_expire(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user'),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(7)
+ assert len(u.addresses) == 1
+ u.name = 'foo'
+ del u.addresses[0]
+ sess.expire(u)
+
+ assert 'name' not in u.__dict__
+
+ def go():
+ assert u.name == 'jack'
+ self.assert_sql_count(testbase.db, go, 1)
+ assert 'name' in u.__dict__
+
+ # we're changing the database here, so if this test fails in the middle,
+ # it'll screw up the other tests which are hardcoded to 7/'jack'
+ u.name = 'foo'
+ sess.flush()
+ # change the value in the DB
+ users.update(users.c.id==7, values=dict(name='jack')).execute()
+ sess.expire(u)
+ # object isnt refreshed yet, using dict to bypass trigger
+ assert u.__dict__.get('name') != 'jack'
+ # reload all
+ sess.query(User).all()
+ # test that it refreshed
+ assert u.__dict__['name'] == 'jack'
+
+ # object should be back to normal now,
+ # this should *not* produce a SELECT statement (not tested here though....)
+ assert u.name == 'jack'
+
+ def test_expire_doesntload_on_set(self):
+ mapper(User, users)
+
+ sess = create_session()
+ u = sess.query(User).get(7)
+
+ sess.expire(u, attribute_names=['name'])
+ def go():
+ u.name = 'somenewname'
+ self.assert_sql_count(testbase.db, go, 0)
+ sess.flush()
+ sess.clear()
+ assert sess.query(User).get(7).name == 'somenewname'
+
+ def test_expire_committed(self):
+ """test that the committed state of the attribute receives the most recent DB data"""
+ mapper(Order, orders)
+
+ sess = create_session()
+ o = sess.query(Order).get(3)
+ sess.expire(o)
+
+ orders.update(id=3).execute(description='order 3 modified')
+ assert o.isopen == 1
+ assert o._state.committed_state['description'] == 'order 3 modified'
+ def go():
+ sess.flush()
+ self.assert_sql_count(testbase.db, go, 0)
+
+ def test_expire_cascade(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, cascade="all, refresh-expire")
+ })
+ mapper(Address, addresses)
+ s = create_session()
+ u = s.get(User, 8)
+ assert u.addresses[0].email_address == 'ed@wood.com'
+
+ u.addresses[0].email_address = 'someotheraddress'
+ s.expire(u)
+ u.name
+ print u._state.dict
+ assert u.addresses[0].email_address == 'ed@wood.com'
+
+ def test_expired_lazy(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user'),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(7)
+
+ sess.expire(u)
+ assert 'name' not in u.__dict__
+ assert 'addresses' not in u.__dict__
+
+ def go():
+ assert u.addresses[0].email_address == 'jack@bean.com'
+ assert u.name == 'jack'
+ # two loads
+ self.assert_sql_count(testbase.db, go, 2)
+ assert 'name' in u.__dict__
+ assert 'addresses' in u.__dict__
+
+ def test_expired_eager(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user', lazy=False),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(7)
+
+ sess.expire(u)
+ assert 'name' not in u.__dict__
+ assert 'addresses' not in u.__dict__
+
+ def go():
+ assert u.addresses[0].email_address == 'jack@bean.com'
+ assert u.name == 'jack'
+ # one load
+ self.assert_sql_count(testbase.db, go, 1)
+ assert 'name' in u.__dict__
+ assert 'addresses' in u.__dict__
+
+ def test_partial_expire(self):
+ mapper(Order, orders)
+
+ sess = create_session()
+ o = sess.query(Order).get(3)
+
+ sess.expire(o, attribute_names=['description'])
+ assert 'id' in o.__dict__
+ assert 'description' not in o.__dict__
+ assert o._state.committed_state['isopen'] == 1
+
+ orders.update(orders.c.id==3).execute(description='order 3 modified')
+
+ def go():
+ assert o.description == 'order 3 modified'
+ self.assert_sql_count(testbase.db, go, 1)
+ assert o._state.committed_state['description'] == 'order 3 modified'
+
+ o.isopen = 5
+ sess.expire(o, attribute_names=['description'])
+ assert 'id' in o.__dict__
+ assert 'description' not in o.__dict__
+ assert o.__dict__['isopen'] == 5
+ assert o._state.committed_state['isopen'] == 1
+
+ def go():
+ assert o.description == 'order 3 modified'
+ self.assert_sql_count(testbase.db, go, 1)
+ assert o.__dict__['isopen'] == 5
+ assert o._state.committed_state['description'] == 'order 3 modified'
+ assert o._state.committed_state['isopen'] == 1
+
+ sess.flush()
+
+ sess.expire(o, attribute_names=['id', 'isopen', 'description'])
+ assert 'id' not in o.__dict__
+ assert 'isopen' not in o.__dict__
+ assert 'description' not in o.__dict__
+ def go():
+ assert o.description == 'order 3 modified'
+ assert o.id == 3
+ assert o.isopen == 5
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_partial_expire_lazy(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user'),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(8)
+
+ sess.expire(u, ['name', 'addresses'])
+ assert 'name' not in u.__dict__
+ assert 'addresses' not in u.__dict__
+
+ # hit the lazy loader. just does the lazy load,
+ # doesnt do the overall refresh
+ def go():
+ assert u.addresses[0].email_address=='ed@wood.com'
+ self.assert_sql_count(testbase.db, go, 1)
+
+ assert 'name' not in u.__dict__
+
+ # check that mods to expired lazy-load attributes
+ # only do the lazy load
+ sess.expire(u, ['name', 'addresses'])
+ def go():
+ u.addresses = [Address(id=10, email_address='foo@bar.com')]
+ self.assert_sql_count(testbase.db, go, 1)
+
+ sess.flush()
+
+ # flush has occurred, and addresses was modified,
+ # so the addresses collection got committed and is
+ # longer expired
+ def go():
+ assert u.addresses[0].email_address=='foo@bar.com'
+ assert len(u.addresses) == 1
+ self.assert_sql_count(testbase.db, go, 0)
+
+ # but the name attribute was never loaded and so
+ # still loads
+ def go():
+ assert u.name == 'ed'
+ self.assert_sql_count(testbase.db, go, 1)
+
+ def test_partial_expire_eager(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user', lazy=False),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(8)
+
+ sess.expire(u, ['name', 'addresses'])
+ assert 'name' not in u.__dict__
+ assert 'addresses' not in u.__dict__
+
+ def go():
+ assert u.addresses[0].email_address=='ed@wood.com'
+ self.assert_sql_count(testbase.db, go, 1)
+
+ # check that mods to expired eager-load attributes
+ # do the refresh
+ sess.expire(u, ['name', 'addresses'])
+ def go():
+ u.addresses = [Address(id=10, email_address='foo@bar.com')]
+ self.assert_sql_count(testbase.db, go, 1)
+ sess.flush()
+
+ # this should ideally trigger the whole load
+ # but currently it works like the lazy case
+ def go():
+ assert u.addresses[0].email_address=='foo@bar.com'
+ assert len(u.addresses) == 1
+ self.assert_sql_count(testbase.db, go, 0)
+
+ def go():
+ assert u.name == 'ed'
+ # scalar attributes have their own load
+ self.assert_sql_count(testbase.db, go, 1)
+ # ideally, this was already loaded, but we arent
+ # doing it that way right now
+ #self.assert_sql_count(testbase.db, go, 0)
+
+ def test_partial_expire_deferred(self):
+ mapper(Order, orders, properties={
+ 'description':deferred(orders.c.description)
+ })
+
+ sess = create_session()
+ o = sess.query(Order).get(3)
+ sess.expire(o, ['description', 'isopen'])
+ assert 'isopen' not in o.__dict__
+ assert 'description' not in o.__dict__
+
+ # test that expired attribute access refreshes
+ # the deferred
+ def go():
+ assert o.isopen == 1
+ assert o.description == 'order 3'
+ self.assert_sql_count(testbase.db, go, 1)
+
+ sess.expire(o, ['description', 'isopen'])
+ assert 'isopen' not in o.__dict__
+ assert 'description' not in o.__dict__
+ # test that the deferred attribute triggers the full
+ # reload
+ def go():
+ assert o.description == 'order 3'
+ assert o.isopen == 1
+ self.assert_sql_count(testbase.db, go, 1)
+
+ clear_mappers()
+
+ mapper(Order, orders)
+ sess.clear()
+
+ # same tests, using deferred at the options level
+ o = sess.query(Order).options(defer('description')).get(3)
+
+ assert 'description' not in o.__dict__
+
+ # sanity check
+ def go():
+ assert o.description == 'order 3'
+ self.assert_sql_count(testbase.db, go, 1)
+
+ assert 'description' in o.__dict__
+ assert 'isopen' in o.__dict__
+ sess.expire(o, ['description', 'isopen'])
+ assert 'isopen' not in o.__dict__
+ assert 'description' not in o.__dict__
+
+ # test that expired attribute access refreshes
+ # the deferred
+ def go():
+ assert o.isopen == 1
+ assert o.description == 'order 3'
+ self.assert_sql_count(testbase.db, go, 1)
+ sess.expire(o, ['description', 'isopen'])
+
+ assert 'isopen' not in o.__dict__
+ assert 'description' not in o.__dict__
+ # test that the deferred attribute triggers the full
+ # reload
+ def go():
+ assert o.description == 'order 3'
+ assert o.isopen == 1
+ self.assert_sql_count(testbase.db, go, 1)
+
+
+class RefreshTest(FixtureTest):
+ keep_mappers = False
+ refresh_data = True
+
+ def test_refresh(self):
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), backref='user')
+ })
+ s = create_session()
+ u = s.get(User, 7)
+ u.name = 'foo'
+ a = Address()
+ assert object_session(a) is None
+ u.addresses.append(a)
+ assert a.email_address is None
+ assert id(a) in [id(x) for x in u.addresses]
+
+ s.refresh(u)
+
+ # its refreshed, so not dirty
+ assert u not in s.dirty
+
+ # username is back to the DB
+ assert u.name == 'jack'
+
+ assert id(a) not in [id(x) for x in u.addresses]
+
+ u.name = 'foo'
+ u.addresses.append(a)
+ # now its dirty
+ assert u in s.dirty
+ assert u.name == 'foo'
+ assert id(a) in [id(x) for x in u.addresses]
+ s.expire(u)
+
+ # get the attribute, it refreshes
+ assert u.name == 'jack'
+ assert id(a) not in [id(x) for x in u.addresses]
+
+ def test_refresh_expired(self):
+ mapper(User, users)
+ s = create_session()
+ u = s.get(User, 7)
+ s.expire(u)
+ assert 'name' not in u.__dict__
+ s.refresh(u)
+ assert u.name == 'jack'
+
+ def test_refresh_with_lazy(self):
+ """test that when a lazy loader is set as a trigger on an object's attribute
+ (at the attribute level, not the class level), a refresh() operation doesnt
+ fire the lazy loader or create any problems"""
+
+ s = create_session()
+ mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))})
+ q = s.query(User).options(lazyload('addresses'))
+ u = q.filter(users.c.id==8).first()
+ def go():
+ s.refresh(u)
+ self.assert_sql_count(testbase.db, go, 1)
+
+
+ def test_refresh_with_eager(self):
+ """test that a refresh/expire operation loads rows properly and sends correct "isnew" state to eager loaders"""
+
+ mapper(User, users, properties={
+ 'addresses':relation(mapper(Address, addresses), lazy=False)
+ })
+
+ s = create_session()
+ u = s.get(User, 8)
+ assert len(u.addresses) == 3
+ s.refresh(u)
+ assert len(u.addresses) == 3
+
+ s = create_session()
+ u = s.get(User, 8)
+ assert len(u.addresses) == 3
+ s.expire(u)
+ assert len(u.addresses) == 3
+
+ @testing.fails_on('maxdb')
+ def test_refresh2(self):
+ """test a hang condition that was occuring on expire/refresh"""
+
+ s = create_session()
+ mapper(Address, addresses)
+
+ mapper(User, users, properties = dict(addresses=relation(Address,cascade="all, delete-orphan",lazy=False)) )
+
+ u=User()
+ u.name='Justin'
+ a = Address(id=10, email_address='lala')
+ u.addresses.append(a)
+
+ s.save(u)
+ s.flush()
+ s.clear()
+ u = s.query(User).filter(User.name=='Justin').one()
+
+ s.expire(u)
+ assert u.name == 'Justin'
+
+ s.refresh(u)
+
+if __name__ == '__main__':
+ testbase.main()
diff --git a/test/orm/inheritance/basic.py b/test/orm/inheritance/basic.py
index 9fa7fffba..32420300f 100644
--- a/test/orm/inheritance/basic.py
+++ b/test/orm/inheritance/basic.py
@@ -84,7 +84,7 @@ class GetTest(ORMTest):
Column('bar_id', Integer, ForeignKey('bar.id')),
Column('data', String(20)))
- def create_test(polymorphic):
+ def create_test(polymorphic, name):
def test_get(self):
class Foo(object):
pass
@@ -145,11 +145,11 @@ class GetTest(ORMTest):
assert sess.query(Blub).get(bl.id) == bl
self.assert_sql_count(testbase.db, go, 3)
-
+ test_get.__name__ = name
return test_get
- test_get_polymorphic = create_test(True)
- test_get_nonpolymorphic = create_test(False)
+ test_get_polymorphic = create_test(True, 'test_get_polymorphic')
+ test_get_nonpolymorphic = create_test(False, 'test_get_nonpolymorphic')
class ConstructionTest(ORMTest):
diff --git a/test/orm/lazy_relations.py b/test/orm/lazy_relations.py
index b8e92c163..97eda3006 100644
--- a/test/orm/lazy_relations.py
+++ b/test/orm/lazy_relations.py
@@ -8,12 +8,10 @@ from testlib import *
from testlib.fixtures import *
from query import QueryTest
-class LazyTest(QueryTest):
+class LazyTest(FixtureTest):
keep_mappers = False
-
- def setup_mappers(self):
- pass
-
+ keep_data = True
+
def test_basic(self):
mapper(User, users, properties={
'addresses':relation(mapper(Address, addresses), lazy=True)
@@ -275,13 +273,10 @@ class LazyTest(QueryTest):
assert a.user is u1
-class M2OGetTest(QueryTest):
+class M2OGetTest(FixtureTest):
keep_mappers = False
- keep_data = False
+ keep_data = True
- def setup_mappers(self):
- pass
-
def test_m2o_noload(self):
"""test that a NULL foreign key doesn't trigger a lazy load"""
mapper(User, users)
diff --git a/test/orm/mapper.py b/test/orm/mapper.py
index fe763c0be..5ed479594 100644
--- a/test/orm/mapper.py
+++ b/test/orm/mapper.py
@@ -67,67 +67,12 @@ class MapperTest(MapperSuperTest):
u2 = s.query(User).filter_by(user_name='jack').one()
assert u is u2
- def test_refresh(self):
- mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), backref='user')})
- s = create_session()
- u = s.get(User, 7)
- u.user_name = 'foo'
- a = Address()
- assert object_session(a) is None
- u.addresses.append(a)
-
- self.assert_(a in u.addresses)
-
- s.refresh(u)
-
- # its refreshed, so not dirty
- self.assert_(u not in s.dirty)
-
- # username is back to the DB
- self.assert_(u.user_name == 'jack')
-
- self.assert_(a not in u.addresses)
-
- u.user_name = 'foo'
- u.addresses.append(a)
- # now its dirty
- self.assert_(u in s.dirty)
- self.assert_(u.user_name == 'foo')
- self.assert_(a in u.addresses)
- s.expire(u)
-
- # get the attribute, it refreshes
- self.assert_(u.user_name == 'jack')
- self.assert_(a not in u.addresses)
def test_compileonsession(self):
m = mapper(User, users)
session = create_session()
session.connection(m)
- def test_expirecascade(self):
- mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), cascade="all, refresh-expire")})
- s = create_session()
- u = s.get(User, 8)
- u.addresses[0].email_address = 'someotheraddress'
- s.expire(u)
- assert u.addresses[0].email_address == 'ed@wood.com'
-
- def test_refreshwitheager(self):
- """test that a refresh/expire operation loads rows properly and sends correct "isnew" state to eager loaders"""
- mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), lazy=False)})
- s = create_session()
- u = s.get(User, 8)
- assert len(u.addresses) == 3
- s.refresh(u)
- assert len(u.addresses) == 3
-
- s = create_session()
- u = s.get(User, 8)
- assert len(u.addresses) == 3
- s.expire(u)
- assert len(u.addresses) == 3
-
def test_incompletecolumns(self):
"""test loading from a select which does not contain all columns"""
mapper(Address, addresses)
@@ -186,71 +131,6 @@ class MapperTest(MapperSuperTest):
except Exception, e:
assert e is ex
- def test_refresh_lazy(self):
- """test that when a lazy loader is set as a trigger on an object's attribute (at the attribute level, not the class level), a refresh() operation doesnt fire the lazy loader or create any problems"""
- s = create_session()
- mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))})
- q2 = s.query(User).options(lazyload('addresses'))
- u = q2.selectfirst(users.c.user_id==8)
- def go():
- s.refresh(u)
- self.assert_sql_count(testbase.db, go, 1)
-
- def test_expire(self):
- """test the expire function"""
- s = create_session()
- mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), lazy=False)})
- u = s.get(User, 7)
- assert(len(u.addresses) == 1)
- u.user_name = 'foo'
- del u.addresses[0]
- s.expire(u)
- # test plain expire
- self.assert_(u.user_name =='jack')
- self.assert_(len(u.addresses) == 1)
-
- # we're changing the database here, so if this test fails in the middle,
- # it'll screw up the other tests which are hardcoded to 7/'jack'
- u.user_name = 'foo'
- s.flush()
- # change the value in the DB
- users.update(users.c.user_id==7, values=dict(user_name='jack')).execute()
- s.expire(u)
- # object isnt refreshed yet, using dict to bypass trigger
- self.assert_(u.__dict__.get('user_name') != 'jack')
- # do a select
- s.query(User).select()
- # test that it refreshed
- self.assert_(u.__dict__['user_name'] == 'jack')
-
- # object should be back to normal now,
- # this should *not* produce a SELECT statement (not tested here though....)
- self.assert_(u.user_name =='jack')
-
- @testing.fails_on('maxdb')
- def test_refresh2(self):
- """test a hang condition that was occuring on expire/refresh"""
-
- s = create_session()
- m1 = mapper(Address, addresses)
-
- m2 = mapper(User, users, properties = dict(addresses=relation(Address,private=True,lazy=False)) )
- u=User()
- u.user_name='Justin'
- a = Address()
- a.address_id=17 # to work around the hardcoded IDs in this test suite....
- u.addresses.append(a)
- s.flush()
- s.clear()
- u = s.query(User).selectfirst()
- print u.user_name
-
- #ok so far
- s.expire(u) #hangs when
- print u.user_name #this line runs
-
- s.refresh(u) #hangs
-
def test_props(self):
m = mapper(User, users, properties = {
'addresses' : relation(mapper(Address, addresses))
@@ -299,7 +179,18 @@ class MapperTest(MapperSuperTest):
sess.save(u3)
sess.flush()
sess.rollback()
-
+
+ def test_illegal_non_primary(self):
+ mapper(User, users)
+ mapper(Address, addresses)
+ try:
+ mapper(User, users, non_primary=True, properties={
+ 'addresses':relation(Address)
+ }).compile()
+ assert False
+ except exceptions.ArgumentError, e:
+ assert "Attempting to assign a new relation 'addresses' to a non-primary mapper on class 'User'" in str(e)
+
def test_propfilters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True),
diff --git a/test/orm/query.py b/test/orm/query.py
index 4f19c2c32..438bc9634 100644
--- a/test/orm/query.py
+++ b/test/orm/query.py
@@ -12,16 +12,11 @@ from testlib.fixtures import *
class QueryTest(FixtureTest):
keep_mappers = True
keep_data = True
-
+
def setUpAll(self):
super(QueryTest, self).setUpAll()
- install_fixture_data()
self.setup_mappers()
- def tearDownAll(self):
- clear_mappers()
- super(QueryTest, self).tearDownAll()
-
def setup_mappers(self):
mapper(User, users, properties={
'addresses':relation(Address, backref='user'),
diff --git a/test/orm/unitofwork.py b/test/orm/unitofwork.py
index b985cc8a5..28efbf056 100644
--- a/test/orm/unitofwork.py
+++ b/test/orm/unitofwork.py
@@ -119,8 +119,10 @@ class VersioningTest(ORMTest):
except exceptions.ConcurrentModificationError, e:
assert True
# reload it
+ print "RELOAD"
s1.query(Foo).load(f1s1.id)
# now assert version OK
+ print "VERSIONCHECK"
s1.query(Foo).with_lockmode('read').get(f1s1.id)
# assert brand new load is OK too