summaryrefslogtreecommitdiff
path: root/test/orm/test_dynamic.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/test_dynamic.py')
-rw-r--r--test/orm/test_dynamic.py561
1 files changed, 561 insertions, 0 deletions
diff --git a/test/orm/test_dynamic.py b/test/orm/test_dynamic.py
new file mode 100644
index 000000000..f2089a435
--- /dev/null
+++ b/test/orm/test_dynamic.py
@@ -0,0 +1,561 @@
+from sqlalchemy.test.testing import eq_
+import operator
+from sqlalchemy.orm import dynamic_loader, backref
+from sqlalchemy.test import testing
+from sqlalchemy import Integer, String, ForeignKey, desc, select, func
+from sqlalchemy.test.schema import Table
+from sqlalchemy.test.schema import Column
+from sqlalchemy.orm import mapper, relation, create_session, Query, attributes
+from sqlalchemy.orm.dynamic import AppenderMixin
+from sqlalchemy.test.testing import eq_
+from sqlalchemy.util import function_named
+from test.orm import _base, _fixtures
+
+
+class DynamicTest(_fixtures.FixtureTest):
+ @testing.resolve_artifact_names
+ def test_basic(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ q = sess.query(User)
+
+ u = q.filter(User.id==7).first()
+ eq_([User(id=7,
+ addresses=[Address(id=1, email_address='jack@bean.com')])],
+ q.filter(User.id==7).all())
+ eq_(self.static.user_address_result, q.all())
+
+ @testing.resolve_artifact_names
+ def test_order_by(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ u = sess.query(User).get(8)
+ eq_(list(u.addresses.order_by(desc(Address.email_address))), [Address(email_address=u'ed@wood.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@bettyboop.com')])
+
+ @testing.resolve_artifact_names
+ def test_configured_order_by(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=desc(Address.email_address))
+ })
+ sess = create_session()
+ u = sess.query(User).get(8)
+ eq_(list(u.addresses), [Address(email_address=u'ed@wood.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@bettyboop.com')])
+
+ # test cancellation of None, replacement with something else
+ eq_(
+ list(u.addresses.order_by(None).order_by(Address.email_address)),
+ [Address(email_address=u'ed@bettyboop.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@wood.com')]
+ )
+
+ # test cancellation of None, replacement with nothing
+ eq_(
+ set(u.addresses.order_by(None)),
+ set([Address(email_address=u'ed@bettyboop.com'), Address(email_address=u'ed@lala.com'), Address(email_address=u'ed@wood.com')])
+ )
+
+ @testing.resolve_artifact_names
+ def test_count(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ u = sess.query(User).first()
+ eq_(u.addresses.count(), 1)
+
+ @testing.resolve_artifact_names
+ def test_backref(self):
+ mapper(Address, addresses, properties={
+ 'user':relation(User, backref=backref('addresses', lazy='dynamic'))
+ })
+ mapper(User, users)
+
+ sess = create_session()
+ ad = sess.query(Address).get(1)
+ def go():
+ ad.user = None
+ self.assert_sql_count(testing.db, go, 1)
+ sess.flush()
+ u = sess.query(User).get(7)
+ assert ad not in u.addresses
+
+ @testing.resolve_artifact_names
+ def test_no_count(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ q = sess.query(User)
+
+ # dynamic collection cannot implement __len__() (at least one that
+ # returns a live database result), else additional count() queries are
+ # issued when evaluating in a list context
+ def go():
+ eq_([User(id=7,
+ addresses=[Address(id=1,
+ email_address='jack@bean.com')])],
+ q.filter(User.id==7).all())
+ self.assert_sql_count(testing.db, go, 2)
+
+ @testing.resolve_artifact_names
+ def test_m2m(self):
+ mapper(Order, orders, properties={
+ 'items':relation(Item, secondary=order_items, lazy="dynamic",
+ backref=backref('orders', lazy="dynamic"))
+ })
+ mapper(Item, items)
+
+ sess = create_session()
+ o1 = Order(id=15, description="order 10")
+ i1 = Item(id=10, description="item 8")
+ o1.items.append(i1)
+ sess.add(o1)
+ sess.flush()
+
+ assert o1 in i1.orders.all()
+ assert i1 in o1.items.all()
+
+ @testing.resolve_artifact_names
+ def test_transient_detached(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ u1 = User()
+ u1.addresses.append(Address())
+ assert u1.addresses.count() == 1
+ assert u1.addresses[0] == Address()
+
+ @testing.resolve_artifact_names
+ def test_custom_query(self):
+ class MyQuery(Query):
+ pass
+
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses),
+ query_class=MyQuery)
+ })
+ sess = create_session()
+ u = User()
+ sess.add(u)
+
+ col = u.addresses
+ assert isinstance(col, Query)
+ assert isinstance(col, MyQuery)
+ assert hasattr(col, 'append')
+ assert type(col).__name__ == 'AppenderMyQuery'
+
+ q = col.limit(1)
+ assert isinstance(q, Query)
+ assert isinstance(q, MyQuery)
+ assert not hasattr(q, 'append')
+ assert type(q).__name__ == 'MyQuery'
+
+ @testing.resolve_artifact_names
+ def test_custom_query_with_custom_mixin(self):
+ class MyAppenderMixin(AppenderMixin):
+ def add(self, items):
+ if isinstance(items, list):
+ for item in items:
+ self.append(item)
+ else:
+ self.append(items)
+
+ class MyQuery(Query):
+ pass
+
+ class MyAppenderQuery(MyAppenderMixin, MyQuery):
+ query_class = MyQuery
+
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses),
+ query_class=MyAppenderQuery)
+ })
+ sess = create_session()
+ u = User()
+ sess.add(u)
+
+ col = u.addresses
+ assert isinstance(col, Query)
+ assert isinstance(col, MyQuery)
+ assert hasattr(col, 'append')
+ assert hasattr(col, 'add')
+ assert type(col).__name__ == 'MyAppenderQuery'
+
+ q = col.limit(1)
+ assert isinstance(q, Query)
+ assert isinstance(q, MyQuery)
+ assert not hasattr(q, 'append')
+ assert not hasattr(q, 'add')
+ assert type(q).__name__ == 'MyQuery'
+
+
+class SessionTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ @testing.resolve_artifact_names
+ def test_events(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ u1 = User(name='jack')
+ a1 = Address(email_address='foo')
+ sess.add_all([u1, a1])
+ sess.flush()
+
+ assert testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)) == 0
+ u1 = sess.query(User).get(u1.id)
+ u1.addresses.append(a1)
+ sess.flush()
+
+ assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
+ (a1.id, u1.id, 'foo')
+ ]
+
+ u1.addresses.remove(a1)
+ sess.flush()
+ assert testing.db.scalar(select([func.count(1)]).where(addresses.c.user_id!=None)) == 0
+
+ u1.addresses.append(a1)
+ sess.flush()
+ assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
+ (a1.id, u1.id, 'foo')
+ ]
+
+ a2= Address(email_address='bar')
+ u1.addresses.remove(a1)
+ u1.addresses.append(a2)
+ sess.flush()
+ assert testing.db.execute(select([addresses]).where(addresses.c.user_id!=None)).fetchall() == [
+ (a2.id, u1.id, 'bar')
+ ]
+
+
+ @testing.resolve_artifact_names
+ def test_merge(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=addresses.c.email_address)
+ })
+ sess = create_session()
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+ a2 = Address(email_address='a2')
+ a3 = Address(email_address='a3')
+
+ u1.addresses.append(a2)
+ u1.addresses.append(a3)
+
+ sess.add_all([u1, a1])
+ sess.flush()
+
+ u1 = User(id=u1.id, name='jack')
+ u1.addresses.append(a1)
+ u1.addresses.append(a3)
+ u1 = sess.merge(u1)
+ assert attributes.get_history(u1, 'addresses') == (
+ [a1],
+ [a3],
+ [a2]
+ )
+
+ sess.flush()
+
+ eq_(
+ list(u1.addresses),
+ [a1, a3]
+ )
+
+ @testing.resolve_artifact_names
+ def test_flush(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session()
+ u1 = User(name='jack')
+ u2 = User(name='ed')
+ u2.addresses.append(Address(email_address='foo@bar.com'))
+ u1.addresses.append(Address(email_address='lala@hoho.com'))
+ sess.add_all((u1, u2))
+ sess.flush()
+
+ from sqlalchemy.orm import attributes
+ eq_(attributes.get_history(attributes.instance_state(u1), 'addresses'), ([], [Address(email_address='lala@hoho.com')], []))
+
+ sess.expunge_all()
+
+ # test the test fixture a little bit
+ assert User(name='jack', addresses=[Address(email_address='wrong')]) != sess.query(User).first()
+ assert User(name='jack', addresses=[Address(email_address='lala@hoho.com')]) == sess.query(User).first()
+
+ assert [
+ User(name='jack', addresses=[Address(email_address='lala@hoho.com')]),
+ User(name='ed', addresses=[Address(email_address='foo@bar.com')])
+ ] == sess.query(User).all()
+
+ @testing.resolve_artifact_names
+ def test_hasattr(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ u1 = User(name='jack')
+
+ assert 'addresses' not in u1.__dict__.keys()
+ u1.addresses = [Address(email_address='test')]
+ assert 'addresses' in dir(u1)
+
+ @testing.resolve_artifact_names
+ def test_collection_set(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=addresses.c.email_address)
+ })
+ sess = create_session(autoflush=True, autocommit=False)
+ u1 = User(name='jack')
+ a1 = Address(email_address='a1')
+ a2 = Address(email_address='a2')
+ a3 = Address(email_address='a3')
+ a4 = Address(email_address='a4')
+
+ sess.add(u1)
+ u1.addresses = [a1, a3]
+ assert list(u1.addresses) == [a1, a3]
+ u1.addresses = [a1, a2, a4]
+ assert list(u1.addresses) == [a1, a2, a4]
+ u1.addresses = [a2, a3]
+ assert list(u1.addresses) == [a2, a3]
+ u1.addresses = []
+ assert list(u1.addresses) == []
+
+
+
+
+ @testing.resolve_artifact_names
+ def test_rollback(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses))
+ })
+ sess = create_session(expire_on_commit=False, autocommit=False, autoflush=True)
+ u1 = User(name='jack')
+ u1.addresses.append(Address(email_address='lala@hoho.com'))
+ sess.add(u1)
+ sess.flush()
+ sess.commit()
+ u1.addresses.append(Address(email_address='foo@bar.com'))
+ eq_(u1.addresses.all(), [Address(email_address='lala@hoho.com'), Address(email_address='foo@bar.com')])
+ sess.rollback()
+ eq_(u1.addresses.all(), [Address(email_address='lala@hoho.com')])
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_delete_nocascade(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=Address.id, backref='user')
+ })
+ sess = create_session(autoflush=True)
+ u = User(name='ed')
+ u.addresses.append(Address(email_address='a'))
+ u.addresses.append(Address(email_address='b'))
+ u.addresses.append(Address(email_address='c'))
+ u.addresses.append(Address(email_address='d'))
+ u.addresses.append(Address(email_address='e'))
+ u.addresses.append(Address(email_address='f'))
+ sess.add(u)
+
+ assert Address(email_address='c') == u.addresses[2]
+ sess.delete(u.addresses[2])
+ sess.delete(u.addresses[4])
+ sess.delete(u.addresses[3])
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
+
+ sess.expunge_all()
+ u = sess.query(User).get(u.id)
+
+ sess.delete(u)
+
+ # u.addresses relation will have to force the load
+ # of all addresses so that they can be updated
+ sess.flush()
+ sess.close()
+
+ assert testing.db.scalar(addresses.count(addresses.c.user_id != None)) ==0
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_delete_cascade(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=Address.id, backref='user', cascade="all, delete-orphan")
+ })
+ sess = create_session(autoflush=True)
+ u = User(name='ed')
+ u.addresses.append(Address(email_address='a'))
+ u.addresses.append(Address(email_address='b'))
+ u.addresses.append(Address(email_address='c'))
+ u.addresses.append(Address(email_address='d'))
+ u.addresses.append(Address(email_address='e'))
+ u.addresses.append(Address(email_address='f'))
+ sess.add(u)
+
+ assert Address(email_address='c') == u.addresses[2]
+ sess.delete(u.addresses[2])
+ sess.delete(u.addresses[4])
+ sess.delete(u.addresses[3])
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
+
+ sess.expunge_all()
+ u = sess.query(User).get(u.id)
+
+ sess.delete(u)
+
+ # u.addresses relation will have to force the load
+ # of all addresses so that they can be updated
+ sess.flush()
+ sess.close()
+
+ assert testing.db.scalar(addresses.count()) ==0
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_remove_orphans(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), order_by=Address.id, cascade="all, delete-orphan", backref='user')
+ })
+ sess = create_session(autoflush=True)
+ u = User(name='ed')
+ u.addresses.append(Address(email_address='a'))
+ u.addresses.append(Address(email_address='b'))
+ u.addresses.append(Address(email_address='c'))
+ u.addresses.append(Address(email_address='d'))
+ u.addresses.append(Address(email_address='e'))
+ u.addresses.append(Address(email_address='f'))
+ sess.add(u)
+
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='c'),
+ Address(email_address='d'), Address(email_address='e'), Address(email_address='f')] == sess.query(Address).all()
+
+ assert Address(email_address='c') == u.addresses[2]
+
+ try:
+ del u.addresses[3]
+ assert False
+ except TypeError, e:
+ assert "doesn't support item deletion" in str(e), str(e)
+
+ for a in u.addresses.filter(Address.email_address.in_(['c', 'e', 'f'])):
+ u.addresses.remove(a)
+
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
+
+ assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == sess.query(Address).all()
+
+ sess.delete(u)
+ sess.close()
+
+
+def _create_backref_test(autoflush, saveuser):
+
+ @testing.resolve_artifact_names
+ def test_backref(self):
+ mapper(User, users, properties={
+ 'addresses':dynamic_loader(mapper(Address, addresses), backref='user')
+ })
+ sess = create_session(autoflush=autoflush)
+
+ u = User(name='buffy')
+
+ a = Address(email_address='foo@bar.com')
+ a.user = u
+
+ if saveuser:
+ sess.add(u)
+ else:
+ sess.add(a)
+
+ if not autoflush:
+ sess.flush()
+
+ assert u in sess
+ assert a in sess
+
+ self.assert_(list(u.addresses) == [a])
+
+ a.user = None
+ if not autoflush:
+ self.assert_(list(u.addresses) == [a])
+
+ if not autoflush:
+ sess.flush()
+ self.assert_(list(u.addresses) == [])
+
+ test_backref = function_named(
+ test_backref, "test%s%s" % ((autoflush and "_autoflush" or ""),
+ (saveuser and "_saveuser" or "_savead")))
+ setattr(SessionTest, test_backref.__name__, test_backref)
+
+for autoflush in (False, True):
+ for saveuser in (False, True):
+ _create_backref_test(autoflush, saveuser)
+
+class DontDereferenceTest(_base.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('users', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(40)),
+ Column('fullname', String(100)),
+ Column('password', String(15)))
+
+ Table('addresses', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('email_address', String(100), nullable=False),
+ Column('user_id', Integer, ForeignKey('users.id')))
+
+ @classmethod
+ @testing.resolve_artifact_names
+ def setup_mappers(cls):
+ class User(_base.ComparableEntity):
+ pass
+
+ class Address(_base.ComparableEntity):
+ pass
+
+ mapper(User, users, properties={
+ 'addresses': relation(Address, backref='user', lazy='dynamic')
+ })
+ mapper(Address, addresses)
+
+ @testing.resolve_artifact_names
+ def test_no_deref(self):
+ session = create_session()
+ user = User()
+ user.name = 'joe'
+ user.fullname = 'Joe User'
+ user.password = 'Joe\'s secret'
+ address = Address()
+ address.email_address = 'joe@joesdomain.example'
+ address.user = user
+ session.add(user)
+ session.flush()
+ session.expunge_all()
+
+ def query1():
+ session = create_session(testing.db)
+ user = session.query(User).first()
+ return user.addresses.all()
+
+ def query2():
+ session = create_session(testing.db)
+ return session.query(User).first().addresses.all()
+
+ def query3():
+ session = create_session(testing.db)
+ user = session.query(User).first()
+ return session.query(User).first().addresses.all()
+
+ eq_(query1(), [Address(email_address='joe@joesdomain.example')])
+ eq_(query2(), [Address(email_address='joe@joesdomain.example')])
+ eq_(query3(), [Address(email_address='joe@joesdomain.example')])
+
+