summaryrefslogtreecommitdiff
path: root/test/orm/query.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/query.py')
-rw-r--r--test/orm/query.py728
1 files changed, 518 insertions, 210 deletions
diff --git a/test/orm/query.py b/test/orm/query.py
index f1afdb90b..bc67740f2 100644
--- a/test/orm/query.py
+++ b/test/orm/query.py
@@ -1,7 +1,7 @@
import testenv; testenv.configure_for_tests()
import operator
from sqlalchemy import *
-from sqlalchemy import exceptions, util
+from sqlalchemy import exc as sa_exc, util
from sqlalchemy.sql import compiler
from sqlalchemy.engine import default
from sqlalchemy.orm import *
@@ -10,12 +10,13 @@ from testlib import *
from testlib import engines
from testlib.fixtures import *
-from sqlalchemy.orm.util import _join as join, _outerjoin as outerjoin
+from sqlalchemy.orm.util import join, outerjoin, with_parent
class QueryTest(FixtureTest):
keep_mappers = True
keep_data = True
+
def setup_mappers(self):
mapper(User, users, properties={
'addresses':relation(Address, backref='user'),
@@ -68,11 +69,8 @@ class GetTest(QueryTest):
s = create_session()
- try:
- s.query(User).join('addresses').filter(Address.user_id==8).get(7)
- assert False
- except exceptions.SAWarning, e:
- assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored."
+ q = s.query(User).join('addresses').filter(Address.user_id==8)
+ self.assertRaises(sa_exc.SAWarning, q.get, 7)
@testing.emits_warning('Query.*')
def warns():
@@ -119,7 +117,7 @@ class GetTest(QueryTest):
try:
assert s.query(User).load(19) is None
assert False
- except exceptions.InvalidRequestError:
+ except sa_exc.InvalidRequestError:
assert True
u = s.query(User).load(7)
@@ -193,6 +191,29 @@ class GetTest(QueryTest):
assert u.addresses[0].email_address == 'jack@bean.com'
assert u.orders[1].items[2].description == 'item 5'
+class InvalidGenerationsTest(QueryTest):
+ def test_no_limit_offset(self):
+ s = create_session()
+
+ q = s.query(User).limit(2)
+ self.assertRaises(sa_exc.SAWarning, q.join, "addresses")
+
+ self.assertRaises(sa_exc.SAWarning, q.filter, User.name=='ed')
+
+ self.assertRaises(sa_exc.SAWarning, q.filter_by, name='ed')
+
+ def test_no_from(self):
+ s = create_session()
+
+ q = s.query(User).select_from(users)
+ self.assertRaises(sa_exc.InvalidRequestError, q.select_from, users)
+
+ q = s.query(User).join('addresses')
+ self.assertRaises(sa_exc.InvalidRequestError, q.select_from, users)
+
+ # this is fine, however
+ q.from_self()
+
class OperatorTest(QueryTest):
"""test sql.Comparator implementation for MapperProperties"""
@@ -268,8 +289,40 @@ class OperatorTest(QueryTest):
c = expr.compile(dialect=default.DefaultDialect())
assert str(c) == compare, "%s != %s" % (str(c), compare)
+class RawSelectTest(QueryTest, AssertsCompiledSQL):
+ """compare a bunch of select() tests with the equivalent Query using straight table/columns.
+
+ Results should be the same as Query should act as a select() pass-thru for ClauseElement entities.
+
+ """
+ def test_select(self):
+ sess = create_session()
+
+ self.assert_compile(sess.query(users).select_from(users.select()).with_labels().statement,
+ "SELECT users.id AS users_id, users.name AS users_name FROM users, (SELECT users.id AS id, users.name AS name FROM users) AS anon_1")
+
+ self.assert_compile(sess.query(users, exists([1], from_obj=addresses)).with_labels().statement,
+ "SELECT users.id AS users_id, users.name AS users_name, EXISTS (SELECT 1 FROM addresses) AS anon_1 FROM users")
+ # a little tedious here, adding labels to work around Query's auto-labelling.
+ # also correlate needed explicitly. hmmm.....
+ # TODO: can we detect only one table in the "froms" and then turn off use_labels ?
+ s = sess.query(addresses.c.id.label('id'), addresses.c.email_address.label('email')).\
+ filter(addresses.c.user_id==users.c.id).correlate(users).statement.alias()
+
+ self.assert_compile(sess.query(users, s.c.email).select_from(users.join(s, s.c.id==users.c.id)).with_labels().statement,
+ "SELECT users.id AS users_id, users.name AS users_name, anon_1.email AS anon_1_email "
+ "FROM users JOIN (SELECT addresses.id AS id, addresses.email_address AS email FROM addresses "
+ "WHERE addresses.user_id = users.id) AS anon_1 ON anon_1.id = users.id",
+ dialect=default.DefaultDialect()
+ )
+
+ x = func.lala(users.c.id).label('foo')
+ self.assert_compile(sess.query(x).filter(x==5).statement,
+ "SELECT lala(users.id) AS foo FROM users WHERE lala(users.id) = :param_1", dialect=default.DefaultDialect())
+
class CompileTest(QueryTest):
+
def test_deferred(self):
session = create_session()
s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), Address.user_id==User.id)).compile()
@@ -324,7 +377,7 @@ class FilterTest(QueryTest):
try:
sess.query(User).filter(User.addresses == address)
assert False
- except exceptions.InvalidRequestError:
+ except sa_exc.InvalidRequestError:
assert True
assert [User(id=10)] == sess.query(User).filter(User.addresses==None).all()
@@ -332,7 +385,7 @@ class FilterTest(QueryTest):
try:
assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()
assert False
- except exceptions.InvalidRequestError:
+ except sa_exc.InvalidRequestError:
assert True
#assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all()
@@ -348,33 +401,15 @@ class FilterTest(QueryTest):
filter(User.addresses.any(id=4)).all()
assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all()
-
- @testing.fails_on_everything_except()
- def test_broken_any_1(self):
- sess = create_session()
- # overcorrelates
+ # test that any() doesn't overcorrelate
assert [User(id=7), User(id=8)] == sess.query(User).join("addresses").filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()
-
- def test_broken_any_2(self):
- sess = create_session()
- # works, filter is before the join
- assert [User(id=7), User(id=8)] == sess.query(User).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).join("addresses", aliased=True).all()
-
- def test_broken_any_3(self):
- sess = create_session()
-
- # works, filter is after the join, but reset_joinpoint is called, removing aliasing
- assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(Address.email_address != None).reset_joinpoint().filter(~User.addresses.any(email_address='fred@fred.com')).all()
+ # test that the contents are not adapted by the aliased join
+ assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()
- @testing.fails_on_everything_except()
- def test_broken_any_4(self):
- sess = create_session()
-
- # filter is after the join, gets aliased. in 0.5 any(), has() and not contains() are shielded from aliasing
assert [User(id=10)] == sess.query(User).outerjoin("addresses", aliased=True).filter(~User.addresses.any()).all()
-
+
@testing.unsupported('maxdb') # can core
def test_has(self):
sess = create_session()
@@ -384,6 +419,12 @@ class FilterTest(QueryTest):
assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
+ # test has() doesn't overcorrelate
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
+
+ # test has() doesnt' get subquery contents adapted by aliased join
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
+
dingaling = sess.query(Dingaling).get(2)
assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all()
@@ -457,23 +498,39 @@ class FromSelfTest(QueryTest):
(User(id=8), Address(id=4)),
(User(id=9), Address(id=5))
] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
+
+ def test_multiple_entities(self):
+ sess = create_session()
+
+ self.assertEquals(
+ sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5]))._from_self().all(),
+ [
+ (User(id=8), Address(id=2)),
+ (User(id=9), Address(id=5))
+ ]
+ )
+
+ self.assertEquals(
+ sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5]))._from_self().options(eagerload('addresses')).first(),
+ (User(id=8, addresses=[Address(), Address(), Address()]), Address(id=2)),
+ )
class AggregateTest(QueryTest):
+
def test_sum(self):
sess = create_session()
orders = sess.query(Order).filter(Order.id.in_([2, 3, 4]))
assert orders.sum(Order.user_id * Order.address_id) == 79
- @testing.uses_deprecated('Call to deprecated function apply_sum')
def test_apply(self):
sess = create_session()
- assert sess.query(Order).apply_sum(Order.user_id * Order.address_id).filter(Order.id.in_([2, 3, 4])).one() == 79
+ assert sess.query(func.sum(Order.user_id * Order.address_id)).filter(Order.id.in_([2, 3, 4])).one() == (79,)
def test_having(self):
sess = create_session()
- assert [User(name=u'ed',id=8)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)> 2).all()
+ assert [User(name=u'ed',id=8)] == sess.query(User).group_by(User).join('addresses').having(func.count(Address.id)> 2).all()
- assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)< 2).all()
+ assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by(User).join('addresses').having(func.count(Address.id)< 2).all()
class CountTest(QueryTest):
def test_basic(self):
@@ -561,10 +618,16 @@ class ParentTest(QueryTest):
o = sess.query(Order).with_parent(u1, property='orders').all()
assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
- # test static method
- o = Query.query_from_parent(u1, property='orders', session=sess).all()
+ o = sess.query(Order).filter(with_parent(u1, User.orders)).all()
assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
-
+
+ # test static method
+ @testing.uses_deprecated(".*query_from_parent")
+ def go():
+ o = Query.query_from_parent(u1, property='orders', session=sess).all()
+ assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o
+ go()
+
# test generative criterion
o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all()
assert [Order(description="order 3"), Order(description="order 5")] == o
@@ -582,7 +645,7 @@ class ParentTest(QueryTest):
try:
q = sess.query(Item).with_parent(u1)
assert False
- except exceptions.InvalidRequestError, e:
+ except sa_exc.InvalidRequestError, e:
assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'"
def test_m2m(self):
@@ -594,28 +657,6 @@ class ParentTest(QueryTest):
class JoinTest(QueryTest):
- def test_getjoinable_tables(self):
- sess = create_session()
-
- sel1 = select([users]).alias()
- sel2 = select([users], from_obj=users.join(addresses)).alias()
-
- j1 = sel1.join(users, sel1.c.id==users.c.id)
- j2 = j1.join(addresses)
-
- for from_obj, assert_cond in (
- (users, [users]),
- (users.join(addresses), [users, addresses]),
- (sel1, [sel1]),
- (sel2, [sel2]),
- (sel1.join(users, sel1.c.id==users.c.id), [sel1, users]),
- (sel2.join(users, sel2.c.id==users.c.id), [sel2, users]),
- (j2, [j1, j2, sel1, users, addresses])
-
- ):
- ret = set(sess.query(User).select_from(from_obj)._get_joinable_tables())
- self.assertEquals(ret, set(assert_cond).union([from_obj]), [x.description for x in ret])
-
def test_overlapping_paths(self):
for aliased in (True,False):
# load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
@@ -654,7 +695,34 @@ class JoinTest(QueryTest):
def test_orderby_arg_bug(self):
sess = create_session()
+ # no arg error
+ result = sess.query(User).join('orders', aliased=True).order_by([Order.id]).reset_joinpoint().order_by(users.c.id).all()
+
+ def test_no_onclause(self):
+ sess = create_session()
+
+ self.assertEquals(
+ sess.query(User).select_from(join(User, Order).join(Item, Order.items)).filter(Item.description == 'item 4').all(),
+ [User(name='jack')]
+ )
+
+ self.assertEquals(
+ sess.query(User).join(Order, (Item, Order.items)).filter(Item.description == 'item 4').all(),
+ [User(name='jack')]
+ )
+ def test_clause_onclause(self):
+ sess = create_session()
+
+ self.assertEquals(
+ sess.query(User).join(
+ (Order, User.id==Order.user_id),
+ (order_items, Order.id==order_items.c.order_id),
+ (Item, order_items.c.item_id==Item.id)
+ ).filter(Item.description == 'item 4').all(),
+ [User(name='jack')]
+ )
+
# no arg error
result = sess.query(User).join('orders', aliased=True).order_by([Order.id]).reset_joinpoint().order_by(users.c.id).all()
@@ -682,13 +750,43 @@ class JoinTest(QueryTest):
l = q.select_from(outerjoin(User, AdAlias)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
self.assertEquals(l, [(user8, address3)])
-
l = q.select_from(outerjoin(User, AdAlias, 'addresses')).filter(AdAlias.email_address=='ed@bettyboop.com').all()
self.assertEquals(l, [(user8, address3)])
l = q.select_from(outerjoin(User, AdAlias, User.id==AdAlias.user_id)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
self.assertEquals(l, [(user8, address3)])
+ # this is the first test where we are joining "backwards" - from AdAlias to User even though
+ # the query is against User
+ q = sess.query(User, AdAlias)
+ l = q.join(AdAlias.user).filter(User.name=='ed')
+ self.assertEquals(l.all(), [(user8, address2),(user8, address3),(user8, address4),])
+
+ q = sess.query(User, AdAlias).select_from(join(AdAlias, User, AdAlias.user)).filter(User.name=='ed')
+ self.assertEquals(l.all(), [(user8, address2),(user8, address3),(user8, address4),])
+
+ def test_implicit_joins_from_aliases(self):
+ sess = create_session()
+ OrderAlias = aliased(Order)
+
+ self.assertEquals(
+ sess.query(OrderAlias).join('items').filter_by(description='item 3').all(),
+ [
+ Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1),
+ Order(address_id=4,description=u'order 2',isopen=0,user_id=9,id=2),
+ Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3)
+ ]
+ )
+
+ self.assertEquals(
+ sess.query(User, OrderAlias, Item.description).join(('orders', OrderAlias), 'items').filter_by(description='item 3').all(),
+ [
+ (User(name=u'jack',id=7), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1), u'item 3'),
+ (User(name=u'jack',id=7), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3), u'item 3'),
+ (User(name=u'fred',id=9), Order(address_id=4,description=u'order 2',isopen=0,user_id=9,id=2), u'item 3')
+ ]
+ )
+
def test_aliased_classes_m2m(self):
sess = create_session()
@@ -725,20 +823,6 @@ class JoinTest(QueryTest):
]
)
- def test_generative_join(self):
- # test that alised_ids is copied
- sess = create_session()
- q = sess.query(User).add_entity(Address)
- q1 = q.join('addresses', aliased=True)
- q2 = q.join('addresses', aliased=True)
- q3 = q2.join('addresses', aliased=True)
- q4 = q2.join('addresses', aliased=True, id='someid')
- q5 = q2.join('addresses', aliased=True, id='someid')
- q6 = q5.join('addresses', aliased=True, id='someid')
- assert q1._alias_ids[class_mapper(Address)] != q2._alias_ids[class_mapper(Address)]
- assert q2._alias_ids[class_mapper(Address)] != q3._alias_ids[class_mapper(Address)]
- assert q4._alias_ids['someid'] != q5._alias_ids['someid']
-
def test_reset_joinpoint(self):
for aliased in (True, False):
# load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack)
@@ -779,43 +863,19 @@ class JoinTest(QueryTest):
assert q.count() == 1
assert [User(id=7)] == q.all()
+
# test the control version - same joins but not aliased. rows are not returned because order 3 does not have item 1
- # addtionally by placing this test after the previous one, test that the "aliasing" step does not corrupt the
- # join clauses that are cached by the relationship.
- q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Order.description=="item 1")
+ q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Item.description=="item 1")
assert [] == q.all()
assert q.count() == 0
q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4'))
assert [User(id=7)] == q.all()
-
- def test_aliased_add_entity(self):
- """test the usage of aliased joins with add_entity()"""
- sess = create_session()
- q = sess.query(User).join('orders', aliased=True, id='order1').filter(Order.description=="order 3").join(['orders', 'items'], aliased=True, id='item1').filter(Item.description=="item 1")
-
- try:
- q.add_entity(Order, id='fakeid').compile()
- assert False
- except exceptions.InvalidRequestError, e:
- assert str(e) == "Query has no alias identified by 'fakeid'"
-
- try:
- q.add_entity(Order, id='fakeid').instances(None)
- assert False
- except exceptions.InvalidRequestError, e:
- assert str(e) == "Query has no alias identified by 'fakeid'"
-
- q = q.add_entity(Order, id='order1').add_entity(Item, id='item1')
+
+ # test that aliasing gets reset when join() is called
+ q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', aliased=True).filter(Order.description=="order 5")
assert q.count() == 1
- assert [(User(id=7), Order(description='order 3'), Item(description='item 1'))] == q.all()
-
- q = sess.query(User).add_entity(Order).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', aliased=True).filter(Order.description=='order 4')
- try:
- q.compile()
- assert False
- except exceptions.InvalidRequestError, e:
- assert str(e) == "Ambiguous join for entity 'Mapper|Order|orders'; specify id=<someid> to query.join()/query.add_entity()"
+ assert [User(id=7)] == q.all()
class MultiplePathTest(ORMTest):
def define_tables(self, metadata):
@@ -849,11 +909,10 @@ class MultiplePathTest(ORMTest):
})
mapper(T2, t2)
- try:
- create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint().join('t2s_2')
- assert False
- except exceptions.InvalidRequestError, e:
- assert str(e) == "Can't join to property 't2s_2'; a path to this table along a different secondary table already exists. Use the `alias=True` argument to `join()`."
+ q = create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint()
+ self.assertRaisesMessage(sa_exc.InvalidRequestError, "a path to this table along a different secondary table already exists.",
+ q.join, 't2s_2'
+ )
create_session().query(T1).join('t2s_1', aliased=True).filter(t2.c.id==5).reset_joinpoint().join('t2s_2').all()
create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint().join('t2s_2', aliased=True).all()
@@ -926,26 +985,34 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
+ # better way. use select_from()
+ def go():
+ l = sess.query(User).select_from(query).options(contains_eager('addresses')).all()
+ assert fixtures.user_address_result == l
+ self.assert_sql_count(testing.db, go, 1)
+
def test_contains_eager(self):
sess = create_session()
+ # test that contains_eager suppresses the normal outer join rendering
q = sess.query(User).outerjoin(User.addresses).options(contains_eager(User.addresses))
- self.assert_compile(q.statement, "SELECT addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, "
- "addresses.email_address AS addresses_email_address, users.id AS users_id, users.name AS users_name "\
- "FROM users LEFT OUTER JOIN addresses ON users.id = addresses.user_id ORDER BY users.id", dialect=default.DefaultDialect())
-
+ self.assert_compile(q.with_labels().statement, "SELECT users.id AS users_id, users.name AS users_name, "\
+ "addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, "\
+ "addresses.email_address AS addresses_email_address FROM users LEFT OUTER JOIN addresses "\
+ "ON users.id = addresses.user_id ORDER BY users.id", dialect=default.DefaultDialect())
+
def go():
assert fixtures.user_address_result == q.all()
self.assert_sql_count(testing.db, go, 1)
sess.clear()
-
+
adalias = addresses.alias()
q = sess.query(User).select_from(users.outerjoin(adalias)).options(contains_eager(User.addresses, alias=adalias))
def go():
assert fixtures.user_address_result == q.all()
self.assert_sql_count(testing.db, go, 1)
sess.clear()
-
+
selectquery = users.outerjoin(addresses).select(users.c.id<10, use_labels=True, order_by=[users.c.id, addresses.c.id])
q = sess.query(User)
@@ -956,6 +1023,13 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
sess.clear()
+
+ def go():
+ l = q.options(contains_eager(User.addresses)).instances(selectquery.execute())
+ assert fixtures.user_address_result[0:3] == l
+ self.assert_sql_count(testing.db, go, 1)
+ sess.clear()
+
def go():
l = q.options(contains_eager('addresses')).from_statement(selectquery).all()
assert fixtures.user_address_result[0:3] == l
@@ -966,38 +1040,34 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.id, adalias.c.id])
sess = create_session()
q = sess.query(User)
-
+
+ # string alias name
def go():
- # test using a string alias name
l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute())
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
sess.clear()
+ # expression.Alias object
def go():
- # test using the Alias object itself
l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute())
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
sess.clear()
- def decorate(row):
- d = {}
- for c in addresses.c:
- d[c] = row[adalias.corresponding_column(c)]
- return d
-
+ # Aliased object
+ adalias = aliased(Address)
def go():
- # test using a custom 'decorate' function
- l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute())
- assert fixtures.user_address_result == l
+ l = q.options(contains_eager('addresses', alias=adalias)).outerjoin((adalias, User.addresses)).order_by(User.id, adalias.id)
+ assert fixtures.user_address_result == l.all()
self.assert_sql_count(testing.db, go, 1)
sess.clear()
+
oalias = orders.alias('o1')
ialias = items.alias('i1')
- query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id).order_by(oalias.c.id).order_by(ialias.c.id)
+ query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id, oalias.c.id, ialias.c.id)
q = create_session().query(User)
# test using string alias with more than one level deep
def go():
@@ -1014,9 +1084,24 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
self.assert_sql_count(testing.db, go, 1)
sess.clear()
+ # test using Aliased with more than one level deep
+ oalias = aliased(Order)
+ ialias = aliased(Item)
+ def go():
+ l = q.options(contains_eager(User.orders, alias=oalias), contains_eager(User.orders, Order.items, alias=ialias)).\
+ outerjoin((oalias, User.orders), (ialias, Order.items)).order_by(User.id, oalias.id, ialias.id)
+ assert fixtures.user_order_result == l.all()
+ self.assert_sql_count(testing.db, go, 1)
+ sess.clear()
+
+
+class MixedEntitiesTest(QueryTest):
+
def test_values(self):
sess = create_session()
+ assert list(sess.query(User).values()) == list()
+
sel = users.select(User.id.in_([7, 8])).alias()
q = sess.query(User)
q2 = q.select_from(sel).values(User.name)
@@ -1035,19 +1120,166 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(desc(Address.email_address))[1:3].values(User.name, Address.email_address)
self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@lala.com')])
- q2 = q.join('addresses', aliased=True).filter(User.name.like('%e%')).values(User.name, Address.email_address)
+ adalias = aliased(Address)
+ q2 = q.join(('addresses', adalias)).filter(User.name.like('%e%')).values(User.name, adalias.email_address)
self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')])
q2 = q.values(func.count(User.name))
assert q2.next() == (4,)
- u2 = users.alias()
- q2 = q.select_from(sel).filter(u2.c.id>1).order_by([users.c.id, sel.c.id, u2.c.id]).values(users.c.name, sel.c.name, u2.c.name)
+ u2 = aliased(User)
+ q2 = q.select_from(sel).filter(u2.id>1).order_by([User.id, sel.c.id, u2.id]).values(User.name, sel.c.name, u2.name)
self.assertEquals(list(q2), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')])
- q2 = q.select_from(sel).filter(users.c.id>1).values(users.c.name, sel.c.name, User.name)
- self.assertEquals(list(q2), [(u'jack', u'jack', u'jack'), (u'ed', u'ed', u'ed')])
+ q2 = q.select_from(sel).filter(User.id==8).values(User.name, sel.c.name, User.name)
+ self.assertEquals(list(q2), [(u'ed', u'ed', u'ed')])
+
+ # using User.xxx is alised against "sel", so this query returns nothing
+ q2 = q.select_from(sel).filter(User.id==8).filter(User.id>sel.c.id).values(User.name, sel.c.name, User.name)
+ self.assertEquals(list(q2), [])
+
+ # whereas this uses users.c.xxx, is not aliased and creates a new join
+ q2 = q.select_from(sel).filter(users.c.id==8).filter(users.c.id>sel.c.id).values(users.c.name, sel.c.name, User.name)
+ self.assertEquals(list(q2), [(u'ed', u'jack', u'jack')])
+ def test_tuple_labeling(self):
+ sess = create_session()
+ for row in sess.query(User, Address).join(User.addresses).all():
+ self.assertEquals(set(row.keys()), set(['User', 'Address']))
+ self.assertEquals(row.User, row[0])
+ self.assertEquals(row.Address, row[1])
+
+ for row in sess.query(User.name, User.id.label('foobar')):
+ self.assertEquals(set(row.keys()), set(['name', 'foobar']))
+ self.assertEquals(row.name, row[0])
+ self.assertEquals(row.foobar, row[1])
+
+ for row in sess.query(User).values(User.name, User.id.label('foobar')):
+ self.assertEquals(set(row.keys()), set(['name', 'foobar']))
+ self.assertEquals(row.name, row[0])
+ self.assertEquals(row.foobar, row[1])
+
+ oalias = aliased(Order)
+ for row in sess.query(User, oalias).join(User.orders).all():
+ self.assertEquals(set(row.keys()), set(['User']))
+ self.assertEquals(row.User, row[0])
+
+ oalias = aliased(Order, name='orders')
+ for row in sess.query(User, oalias).join(User.orders).all():
+ self.assertEquals(set(row.keys()), set(['User', 'orders']))
+ self.assertEquals(row.User, row[0])
+ self.assertEquals(row.orders, row[1])
+
+
+ def test_column_queries(self):
+ sess = create_session()
+
+ self.assertEquals(sess.query(User.name).all(), [(u'jack',), (u'ed',), (u'fred',), (u'chuck',)])
+
+ sel = users.select(User.id.in_([7, 8])).alias()
+ q = sess.query(User.name)
+ q2 = q.select_from(sel).all()
+ self.assertEquals(list(q2), [(u'jack',), (u'ed',)])
+
+ self.assertEquals(sess.query(User.name, Address.email_address).filter(User.id==Address.user_id).all(), [
+ (u'jack', u'jack@bean.com'), (u'ed', u'ed@wood.com'),
+ (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'),
+ (u'fred', u'fred@fred.com')
+ ])
+
+ self.assertEquals(sess.query(User.name, func.count(Address.email_address)).outerjoin(User.addresses).group_by(User.id, User.name).order_by(User.id).all(),
+ [(u'jack', 1), (u'ed', 3), (u'fred', 1), (u'chuck', 0)]
+ )
+
+ self.assertEquals(sess.query(User, func.count(Address.email_address)).outerjoin(User.addresses).group_by(User).order_by(User.id).all(),
+ [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)]
+ )
+
+ self.assertEquals(sess.query(func.count(Address.email_address), User).outerjoin(User.addresses).group_by(User).order_by(User.id).all(),
+ [(1, User(name='jack',id=7)), (3, User(name='ed',id=8)), (1, User(name='fred',id=9)), (0, User(name='chuck',id=10))]
+ )
+
+ adalias = aliased(Address)
+ self.assertEquals(sess.query(User, func.count(adalias.email_address)).outerjoin(('addresses', adalias)).group_by(User).order_by(User.id).all(),
+ [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)]
+ )
+
+ self.assertEquals(sess.query(func.count(adalias.email_address), User).outerjoin((User.addresses, adalias)).group_by(User).order_by(User.id).all(),
+ [(1, User(name=u'jack',id=7)), (3, User(name=u'ed',id=8)), (1, User(name=u'fred',id=9)), (0, User(name=u'chuck',id=10))]
+ )
+
+ # select from aliasing + explicit aliasing
+ self.assertEquals(
+ sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).order_by(User.id, adalias.id).all(),
+ [
+ (User(name=u'jack',id=7), u'jack@bean.com'),
+ (User(name=u'ed',id=8), u'ed@wood.com'),
+ (User(name=u'ed',id=8), u'ed@bettyboop.com'),
+ (User(name=u'ed',id=8), u'ed@lala.com'),
+ (User(name=u'fred',id=9), u'fred@fred.com'),
+ (User(name=u'chuck',id=10), None)
+ ]
+ )
+
+ # anon + select from aliasing
+ self.assertEquals(
+ sess.query(User).join(User.addresses, aliased=True).filter(Address.email_address.like('%ed%')).from_self().all(),
+ [
+ User(name=u'ed',id=8),
+ User(name=u'fred',id=9),
+ ]
+ )
+
+ # test eager aliasing, with/without select_from aliasing
+ for q in [
+ sess.query(User, adalias.email_address).outerjoin((User.addresses, adalias)).options(eagerload(User.addresses)).order_by(User.id, adalias.id).limit(10),
+ sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).options(eagerload(User.addresses)).order_by(User.id, adalias.id).limit(10),
+ ]:
+ self.assertEquals(
+ q.all(),
+ [(User(addresses=[Address(user_id=7,email_address=u'jack@bean.com',id=1)],name=u'jack',id=7), u'jack@bean.com'),
+ (User(addresses=[
+ Address(user_id=8,email_address=u'ed@wood.com',id=2),
+ Address(user_id=8,email_address=u'ed@bettyboop.com',id=3),
+ Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@wood.com'),
+ (User(addresses=[
+ Address(user_id=8,email_address=u'ed@wood.com',id=2),
+ Address(user_id=8,email_address=u'ed@bettyboop.com',id=3),
+ Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@bettyboop.com'),
+ (User(addresses=[
+ Address(user_id=8,email_address=u'ed@wood.com',id=2),
+ Address(user_id=8,email_address=u'ed@bettyboop.com',id=3),
+ Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@lala.com'),
+ (User(addresses=[Address(user_id=9,email_address=u'fred@fred.com',id=5)],name=u'fred',id=9), u'fred@fred.com'),
+
+ (User(addresses=[],name=u'chuck',id=10), None)]
+ )
+
+ def test_self_referential(self):
+
+ sess = create_session()
+ oalias = aliased(Order)
+
+ for q in [
+ sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id),
+ sess.query(Order, oalias)._from_self().filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id),
+ # here we go....two layers of aliasing
+ sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
+
+ # gratuitous four layers
+ sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self()._from_self()._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)),
+
+ ]:
+
+ self.assertEquals(
+ q.all(),
+ [
+ (Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1)),
+ (Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1)),
+ (Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3))
+ ]
+ )
+
def test_multi_mappers(self):
test_session = create_session()
@@ -1055,7 +1287,6 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
(user7, user8, user9, user10) = test_session.query(User).all()
(address1, address2, address3, address4, address5) = test_session.query(Address).all()
- # note the result is a cartesian product
expected = [(user7, address1),
(user8, address2),
(user8, address3),
@@ -1066,30 +1297,24 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
sess = create_session()
selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
- q = sess.query(User)
- l = q.instances(selectquery.execute(), Address)
- assert l == expected
-
+ self.assertEquals(sess.query(User, Address).instances(selectquery.execute()), expected)
sess.clear()
- for aliased in (False, True):
- q = sess.query(User)
-
- q = q.add_entity(Address).outerjoin('addresses', aliased=aliased)
- l = q.all()
- assert l == expected
+ for address_entity in (Address, aliased(Address)):
+ q = sess.query(User).add_entity(address_entity).outerjoin(('addresses', address_entity)).order_by(User.id, address_entity.id)
+ self.assertEquals(q.all(), expected)
sess.clear()
- q = sess.query(User).add_entity(Address)
- l = q.join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com').all()
- assert l == [(user8, address3)]
+ q = sess.query(User).add_entity(address_entity)
+ q = q.join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com')
+ self.assertEquals(q.all(), [(user8, address3)])
sess.clear()
- q = sess.query(User, Address).join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com')
- assert q.all() == [(user8, address3)]
+ q = sess.query(User, address_entity).join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com')
+ self.assertEquals(q.all(), [(user8, address3)])
sess.clear()
- q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
+ q = sess.query(User, address_entity).join(('addresses', address_entity)).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
self.assertEquals(list(util.OrderedSet(q.all())), [(user8, address3)])
sess.clear()
@@ -1123,18 +1348,12 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
expected = [(u, u.name) for u in sess.query(User).all()]
- for add_col in (User.name, users.c.name, User.c.name):
+ for add_col in (User.name, users.c.name):
assert sess.query(User).add_column(add_col).all() == expected
sess.clear()
- self.assertRaises(exceptions.InvalidRequestError, sess.query(User).add_column, object())
+ self.assertRaises(sa_exc.InvalidRequestError, sess.query(User).add_column, object())
- def test_ambiguous_column(self):
- sess = create_session()
-
- q = sess.query(User).join('addresses', aliased=True).join('addresses', aliased=True).add_column(Address.id)
- self.assertRaises(exceptions.InvalidRequestError, iter, q)
-
def test_multi_columns_2(self):
"""test aliased/nonalised joins with the usage of add_column()"""
sess = create_session()
@@ -1146,12 +1365,16 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
(user10, 0)
]
- for aliased in (False, True):
- q = sess.query(User)
- q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses', aliased=aliased).add_column(func.count(Address.id).label('count'))
- l = q.all()
- assert l == expected
- sess.clear()
+ q = sess.query(User)
+ q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses').add_column(func.count(Address.id).label('count'))
+ self.assertEquals(q.all(), expected)
+ sess.clear()
+
+ adalias = aliased(Address)
+ q = sess.query(User)
+ q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin(('addresses', adalias)).add_column(func.count(adalias.id).label('count'))
+ self.assertEquals(q.all(), expected)
+ sess.clear()
s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
q = sess.query(User)
@@ -1159,7 +1382,7 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
assert l == expected
- def test_two_columns(self):
+ def test_raw_columns(self):
sess = create_session()
(user7, user8, user9, user10) = sess.query(User).all()
expected = [
@@ -1168,8 +1391,9 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
(user9, 1, "Name:fred"),
(user10, 0, "Name:chuck")]
- q = create_session().query(User).add_column(func.count(addresses.c.id))\
- .add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=True)\
+ adalias = addresses.alias()
+ q = create_session().query(User).add_column(func.count(adalias.c.id))\
+ .add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
.group_by([c for c in users.c]).order_by(users.c.id)
assert q.all() == expected
@@ -1190,14 +1414,19 @@ class InstancesTest(QueryTest, AssertsCompiledSQL):
assert q.all() == expected
sess.clear()
- # test with outerjoin() both aliased and non
- for aliased in (False, True):
- q = create_session().query(User).add_column(func.count(addresses.c.id))\
- .add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=aliased)\
- .group_by([c for c in users.c]).order_by(users.c.id)
+ q = create_session().query(User).add_column(func.count(addresses.c.id))\
+ .add_column(("Name:" + users.c.name)).outerjoin('addresses')\
+ .group_by([c for c in users.c]).order_by(users.c.id)
- assert q.all() == expected
- sess.clear()
+ assert q.all() == expected
+ sess.clear()
+
+ q = create_session().query(User).add_column(func.count(adalias.c.id))\
+ .add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
+ .group_by([c for c in users.c]).order_by(users.c.id)
+
+ assert q.all() == expected
+ sess.clear()
class SelectFromTest(QueryTest):
@@ -1217,7 +1446,7 @@ class SelectFromTest(QueryTest):
self.assertEquals(sess.query(User).select_from(sel).all(), [User(id=7), User(id=8)])
- self.assertEquals(sess.query(User).select_from(sel).filter(User.c.id==8).all(), [User(id=8)])
+ self.assertEquals(sess.query(User).select_from(sel).filter(User.id==8).all(), [User(id=8)])
self.assertEquals(sess.query(User).select_from(sel).order_by(desc(User.name)).all(), [
User(name='jack',id=7), User(name='ed',id=8)
@@ -1273,7 +1502,8 @@ class SelectFromTest(QueryTest):
]
)
- self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(),
+ adalias = aliased(Address)
+ self.assertEquals(sess.query(User).select_from(sel).join(('addresses', adalias)).add_entity(adalias).order_by(User.id).order_by(adalias.id).all(),
[
(User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)),
(User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)),
@@ -1297,12 +1527,15 @@ class SelectFromTest(QueryTest):
sel = users.select(users.c.id.in_([7, 8]))
sess = create_session()
+
+ # TODO: remove
+ sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join('orders', 'items', 'keywords', aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all()
- self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords']).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
+ self.assertEquals(sess.query(User).select_from(sel).join('orders', 'items', 'keywords').filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
User(name=u'jack',id=7)
])
- self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords'], aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
+ self.assertEquals(sess.query(User).select_from(sel).join('orders', 'items', 'keywords', aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [
User(name=u'jack',id=7)
])
@@ -1355,7 +1588,7 @@ class SelectFromTest(QueryTest):
sess.clear()
def go():
- self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(),
+ self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.id==8).all(),
[User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])]
)
self.assert_sql_count(testing.db, go, 1)
@@ -1364,7 +1597,7 @@ class SelectFromTest(QueryTest):
def go():
self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]))
self.assert_sql_count(testing.db, go, 1)
-
+
class CustomJoinTest(QueryTest):
keep_mappers = False
@@ -1428,6 +1661,10 @@ class SelfReferentialTest(ORMTest):
node = sess.query(Node).join('children', aliased=True).filter_by(data='n122').first()
assert node.data=='n12'
+ ret = sess.query(Node.data).join(Node.children, aliased=True).filter_by(data='n122').all()
+ assert ret == [('n12',)]
+
+
node = sess.query(Node).join(['children', 'children'], aliased=True).filter_by(data='n122').first()
assert node.data=='n1'
@@ -1461,10 +1698,66 @@ class SelfReferentialTest(ORMTest):
list(sess.query(Node).select_from(join(Node, n1, 'parent').join(n2, 'parent')).\
filter(and_(Node.data=='n122', n1.data=='n12', n2.data=='n1')).values(Node.data, n1.data, n2.data)),
[('n122', 'n12', 'n1')])
+
+ def test_join_to_nonaliased(self):
+ sess = create_session()
- def test_any(self):
+ n1 = aliased(Node)
+
+ # using 'n1.parent' implicitly joins to unaliased Node
+ self.assertEquals(
+ sess.query(n1).join(n1.parent).filter(Node.data=='n1').all(),
+ [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)]
+ )
+
+ # explicit (new syntax)
+ self.assertEquals(
+ sess.query(n1).join((Node, n1.parent)).filter(Node.data=='n1').all(),
+ [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)]
+ )
+
+ def test_multiple_explicit_entities(self):
sess = create_session()
+ parent = aliased(Node)
+ grandparent = aliased(Node)
+ self.assertEquals(
+ sess.query(Node, parent, grandparent).\
+ join((Node.parent, parent), (parent.parent, grandparent)).\
+ filter(Node.data=='n122').filter(parent.data=='n12').\
+ filter(grandparent.data=='n1').first(),
+ (Node(data='n122'), Node(data='n12'), Node(data='n1'))
+ )
+
+ self.assertEquals(
+ sess.query(Node, parent, grandparent).\
+ join((Node.parent, parent), (parent.parent, grandparent)).\
+ filter(Node.data=='n122').filter(parent.data=='n12').\
+ filter(grandparent.data=='n1')._from_self().first(),
+ (Node(data='n122'), Node(data='n12'), Node(data='n1'))
+ )
+
+ self.assertEquals(
+ sess.query(Node, parent, grandparent).\
+ join((Node.parent, parent), (parent.parent, grandparent)).\
+ filter(Node.data=='n122').filter(parent.data=='n12').\
+ filter(grandparent.data=='n1').\
+ options(eagerload(Node.children)).first(),
+ (Node(data='n122'), Node(data='n12'), Node(data='n1'))
+ )
+
+ self.assertEquals(
+ sess.query(Node, parent, grandparent).\
+ join((Node.parent, parent), (parent.parent, grandparent)).\
+ filter(Node.data=='n122').filter(parent.data=='n12').\
+ filter(grandparent.data=='n1')._from_self().\
+ options(eagerload(Node.children)).first(),
+ (Node(data='n122'), Node(data='n12'), Node(data='n1'))
+ )
+
+
+ def test_any(self):
+ sess = create_session()
self.assertEquals(sess.query(Node).filter(Node.children.any(Node.data=='n1')).all(), [])
self.assertEquals(sess.query(Node).filter(Node.children.any(Node.data=='n12')).all(), [Node(data='n1')])
self.assertEquals(sess.query(Node).filter(~Node.children.any()).all(), [Node(data='n11'), Node(data='n13'),Node(data='n121'),Node(data='n122'),Node(data='n123'),])
@@ -1561,6 +1854,8 @@ class SelfReferentialM2MTest(ORMTest):
)
class ExternalColumnsTest(QueryTest):
+ """test mappers with SQL-expressions added as column properties."""
+
keep_mappers = False
def setup_mappers(self):
@@ -1568,15 +1863,11 @@ class ExternalColumnsTest(QueryTest):
def test_external_columns_bad(self):
- self.assertRaisesMessage(exceptions.ArgumentError, "not represented in mapper's table", mapper, User, users, properties={
+ self.assertRaisesMessage(sa_exc.ArgumentError, "not represented in mapper's table", mapper, User, users, properties={
'concat': (users.c.id * 2),
})
clear_mappers()
- self.assertRaisesMessage(exceptions.ArgumentError, "must be given a ColumnElement as its argument.", column_property,
- select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users)
- )
-
def test_external_columns_good(self):
"""test querying mappings that reference external columns or selectables."""
@@ -1586,19 +1877,21 @@ class ExternalColumnsTest(QueryTest):
})
mapper(Address, addresses, properties={
- 'user':relation(User, lazy=True)
+ 'user':relation(User)
})
sess = create_session()
-
- l = sess.query(User).all()
- assert [
- User(id=7, concat=14, count=1),
- User(id=8, concat=16, count=3),
- User(id=9, concat=18, count=1),
- User(id=10, concat=20, count=0),
- ] == l
+ sess.query(Address).options(eagerload('user')).all()
+
+ self.assertEquals(sess.query(User).all(),
+ [
+ User(id=7, concat=14, count=1),
+ User(id=8, concat=16, count=3),
+ User(id=9, concat=18, count=1),
+ User(id=10, concat=20, count=0),
+ ]
+ )
address_result = [
Address(id=1, user=User(id=7, concat=14, count=1)),
@@ -1617,15 +1910,24 @@ class ExternalColumnsTest(QueryTest):
self.assertEquals(sess.query(Address).options(eagerload('user')).all(), address_result)
self.assert_sql_count(testing.db, go, 1)
- tuple_address_result = [(address, address.user) for address in address_result]
-
- q =sess.query(Address).join('user', aliased=True, id='ualias').join('user', aliased=True).add_column(User.concat)
- self.assertRaisesMessage(exceptions.InvalidRequestError, "Ambiguous", q.all)
-
- self.assertEquals(sess.query(Address).join('user', aliased=True, id='ualias').add_entity(User, id='ualias').all(), tuple_address_result)
+ ualias = aliased(User)
+ self.assertEquals(
+ sess.query(Address, ualias).join(('user', ualias)).all(),
+ [(address, address.user) for address in address_result]
+ )
- self.assertEquals(sess.query(Address).join('user', aliased=True, id='ualias').join('user', aliased=True).\
- add_column(User.concat, id='ualias').add_column(User.count, id='ualias').all(),
+ self.assertEquals(
+ sess.query(Address, ualias.count).join(('user', ualias)).join('user', aliased=True).all(),
+ [
+ (Address(id=1), 1),
+ (Address(id=2), 3),
+ (Address(id=3), 3),
+ (Address(id=4), 3),
+ (Address(id=5), 1)
+ ]
+ )
+
+ self.assertEquals(sess.query(Address, ualias.concat, ualias.count).join(('user', ualias)).join('user', aliased=True).all(),
[
(Address(id=1), 14, 1),
(Address(id=2), 16, 3),
@@ -1635,15 +1937,21 @@ class ExternalColumnsTest(QueryTest):
]
)
- self.assertEquals(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)),
- [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
+ ua = aliased(User)
+ self.assertEquals(sess.query(Address, ua.concat, ua.count).select_from(join(Address, ua, 'user')).options(eagerload(Address.user)).all(),
+ [
+ (Address(id=1, user=User(id=7, concat=14, count=1)), 14, 1),
+ (Address(id=2, user=User(id=8, concat=16, count=3)), 16, 3),
+ (Address(id=3, user=User(id=8, concat=16, count=3)), 16, 3),
+ (Address(id=4, user=User(id=8, concat=16, count=3)), 16, 3),
+ (Address(id=5, user=User(id=9, concat=18, count=1)), 18, 1)
+ ]
)
- self.assertEquals(list(sess.query(Address).join('user', aliased=True).values(Address.id, User.id, User.concat, User.count)),
+ self.assertEquals(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)),
[(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
)
- ua = aliased(User)
self.assertEquals(list(sess.query(Address, ua).select_from(join(Address,ua, 'user')).values(Address.id, ua.id, ua.concat, ua.count)),
[(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)]
)