diff options
Diffstat (limited to 'test/orm/query.py')
| -rw-r--r-- | test/orm/query.py | 728 |
1 files changed, 518 insertions, 210 deletions
diff --git a/test/orm/query.py b/test/orm/query.py index f1afdb90b..bc67740f2 100644 --- a/test/orm/query.py +++ b/test/orm/query.py @@ -1,7 +1,7 @@ import testenv; testenv.configure_for_tests() import operator from sqlalchemy import * -from sqlalchemy import exceptions, util +from sqlalchemy import exc as sa_exc, util from sqlalchemy.sql import compiler from sqlalchemy.engine import default from sqlalchemy.orm import * @@ -10,12 +10,13 @@ from testlib import * from testlib import engines from testlib.fixtures import * -from sqlalchemy.orm.util import _join as join, _outerjoin as outerjoin +from sqlalchemy.orm.util import join, outerjoin, with_parent class QueryTest(FixtureTest): keep_mappers = True keep_data = True + def setup_mappers(self): mapper(User, users, properties={ 'addresses':relation(Address, backref='user'), @@ -68,11 +69,8 @@ class GetTest(QueryTest): s = create_session() - try: - s.query(User).join('addresses').filter(Address.user_id==8).get(7) - assert False - except exceptions.SAWarning, e: - assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored." + q = s.query(User).join('addresses').filter(Address.user_id==8) + self.assertRaises(sa_exc.SAWarning, q.get, 7) @testing.emits_warning('Query.*') def warns(): @@ -119,7 +117,7 @@ class GetTest(QueryTest): try: assert s.query(User).load(19) is None assert False - except exceptions.InvalidRequestError: + except sa_exc.InvalidRequestError: assert True u = s.query(User).load(7) @@ -193,6 +191,29 @@ class GetTest(QueryTest): assert u.addresses[0].email_address == 'jack@bean.com' assert u.orders[1].items[2].description == 'item 5' +class InvalidGenerationsTest(QueryTest): + def test_no_limit_offset(self): + s = create_session() + + q = s.query(User).limit(2) + self.assertRaises(sa_exc.SAWarning, q.join, "addresses") + + self.assertRaises(sa_exc.SAWarning, q.filter, User.name=='ed') + + self.assertRaises(sa_exc.SAWarning, q.filter_by, name='ed') + + def test_no_from(self): + s = create_session() + + q = s.query(User).select_from(users) + self.assertRaises(sa_exc.InvalidRequestError, q.select_from, users) + + q = s.query(User).join('addresses') + self.assertRaises(sa_exc.InvalidRequestError, q.select_from, users) + + # this is fine, however + q.from_self() + class OperatorTest(QueryTest): """test sql.Comparator implementation for MapperProperties""" @@ -268,8 +289,40 @@ class OperatorTest(QueryTest): c = expr.compile(dialect=default.DefaultDialect()) assert str(c) == compare, "%s != %s" % (str(c), compare) +class RawSelectTest(QueryTest, AssertsCompiledSQL): + """compare a bunch of select() tests with the equivalent Query using straight table/columns. + + Results should be the same as Query should act as a select() pass-thru for ClauseElement entities. + + """ + def test_select(self): + sess = create_session() + + self.assert_compile(sess.query(users).select_from(users.select()).with_labels().statement, + "SELECT users.id AS users_id, users.name AS users_name FROM users, (SELECT users.id AS id, users.name AS name FROM users) AS anon_1") + + self.assert_compile(sess.query(users, exists([1], from_obj=addresses)).with_labels().statement, + "SELECT users.id AS users_id, users.name AS users_name, EXISTS (SELECT 1 FROM addresses) AS anon_1 FROM users") + # a little tedious here, adding labels to work around Query's auto-labelling. + # also correlate needed explicitly. hmmm..... + # TODO: can we detect only one table in the "froms" and then turn off use_labels ? + s = sess.query(addresses.c.id.label('id'), addresses.c.email_address.label('email')).\ + filter(addresses.c.user_id==users.c.id).correlate(users).statement.alias() + + self.assert_compile(sess.query(users, s.c.email).select_from(users.join(s, s.c.id==users.c.id)).with_labels().statement, + "SELECT users.id AS users_id, users.name AS users_name, anon_1.email AS anon_1_email " + "FROM users JOIN (SELECT addresses.id AS id, addresses.email_address AS email FROM addresses " + "WHERE addresses.user_id = users.id) AS anon_1 ON anon_1.id = users.id", + dialect=default.DefaultDialect() + ) + + x = func.lala(users.c.id).label('foo') + self.assert_compile(sess.query(x).filter(x==5).statement, + "SELECT lala(users.id) AS foo FROM users WHERE lala(users.id) = :param_1", dialect=default.DefaultDialect()) + class CompileTest(QueryTest): + def test_deferred(self): session = create_session() s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), Address.user_id==User.id)).compile() @@ -324,7 +377,7 @@ class FilterTest(QueryTest): try: sess.query(User).filter(User.addresses == address) assert False - except exceptions.InvalidRequestError: + except sa_exc.InvalidRequestError: assert True assert [User(id=10)] == sess.query(User).filter(User.addresses==None).all() @@ -332,7 +385,7 @@ class FilterTest(QueryTest): try: assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all() assert False - except exceptions.InvalidRequestError: + except sa_exc.InvalidRequestError: assert True #assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all() @@ -348,33 +401,15 @@ class FilterTest(QueryTest): filter(User.addresses.any(id=4)).all() assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all() - - @testing.fails_on_everything_except() - def test_broken_any_1(self): - sess = create_session() - # overcorrelates + # test that any() doesn't overcorrelate assert [User(id=7), User(id=8)] == sess.query(User).join("addresses").filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all() - - def test_broken_any_2(self): - sess = create_session() - # works, filter is before the join - assert [User(id=7), User(id=8)] == sess.query(User).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).join("addresses", aliased=True).all() - - def test_broken_any_3(self): - sess = create_session() - - # works, filter is after the join, but reset_joinpoint is called, removing aliasing - assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(Address.email_address != None).reset_joinpoint().filter(~User.addresses.any(email_address='fred@fred.com')).all() + # test that the contents are not adapted by the aliased join + assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all() - @testing.fails_on_everything_except() - def test_broken_any_4(self): - sess = create_session() - - # filter is after the join, gets aliased. in 0.5 any(), has() and not contains() are shielded from aliasing assert [User(id=10)] == sess.query(User).outerjoin("addresses", aliased=True).filter(~User.addresses.any()).all() - + @testing.unsupported('maxdb') # can core def test_has(self): sess = create_session() @@ -384,6 +419,12 @@ class FilterTest(QueryTest): assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).all() + # test has() doesn't overcorrelate + assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).all() + + # test has() doesnt' get subquery contents adapted by aliased join + assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).all() + dingaling = sess.query(Dingaling).get(2) assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all() @@ -457,23 +498,39 @@ class FromSelfTest(QueryTest): (User(id=8), Address(id=4)), (User(id=9), Address(id=5)) ] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().join('addresses').add_entity(Address).order_by(User.id, Address.id).all() + + def test_multiple_entities(self): + sess = create_session() + + self.assertEquals( + sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5]))._from_self().all(), + [ + (User(id=8), Address(id=2)), + (User(id=9), Address(id=5)) + ] + ) + + self.assertEquals( + sess.query(User, Address).filter(User.id==Address.user_id).filter(Address.id.in_([2, 5]))._from_self().options(eagerload('addresses')).first(), + (User(id=8, addresses=[Address(), Address(), Address()]), Address(id=2)), + ) class AggregateTest(QueryTest): + def test_sum(self): sess = create_session() orders = sess.query(Order).filter(Order.id.in_([2, 3, 4])) assert orders.sum(Order.user_id * Order.address_id) == 79 - @testing.uses_deprecated('Call to deprecated function apply_sum') def test_apply(self): sess = create_session() - assert sess.query(Order).apply_sum(Order.user_id * Order.address_id).filter(Order.id.in_([2, 3, 4])).one() == 79 + assert sess.query(func.sum(Order.user_id * Order.address_id)).filter(Order.id.in_([2, 3, 4])).one() == (79,) def test_having(self): sess = create_session() - assert [User(name=u'ed',id=8)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)> 2).all() + assert [User(name=u'ed',id=8)] == sess.query(User).group_by(User).join('addresses').having(func.count(Address.id)> 2).all() - assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)< 2).all() + assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by(User).join('addresses').having(func.count(Address.id)< 2).all() class CountTest(QueryTest): def test_basic(self): @@ -561,10 +618,16 @@ class ParentTest(QueryTest): o = sess.query(Order).with_parent(u1, property='orders').all() assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o - # test static method - o = Query.query_from_parent(u1, property='orders', session=sess).all() + o = sess.query(Order).filter(with_parent(u1, User.orders)).all() assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o - + + # test static method + @testing.uses_deprecated(".*query_from_parent") + def go(): + o = Query.query_from_parent(u1, property='orders', session=sess).all() + assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + go() + # test generative criterion o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all() assert [Order(description="order 3"), Order(description="order 5")] == o @@ -582,7 +645,7 @@ class ParentTest(QueryTest): try: q = sess.query(Item).with_parent(u1) assert False - except exceptions.InvalidRequestError, e: + except sa_exc.InvalidRequestError, e: assert str(e) == "Could not locate a property which relates instances of class 'Item' to instances of class 'User'" def test_m2m(self): @@ -594,28 +657,6 @@ class ParentTest(QueryTest): class JoinTest(QueryTest): - def test_getjoinable_tables(self): - sess = create_session() - - sel1 = select([users]).alias() - sel2 = select([users], from_obj=users.join(addresses)).alias() - - j1 = sel1.join(users, sel1.c.id==users.c.id) - j2 = j1.join(addresses) - - for from_obj, assert_cond in ( - (users, [users]), - (users.join(addresses), [users, addresses]), - (sel1, [sel1]), - (sel2, [sel2]), - (sel1.join(users, sel1.c.id==users.c.id), [sel1, users]), - (sel2.join(users, sel2.c.id==users.c.id), [sel2, users]), - (j2, [j1, j2, sel1, users, addresses]) - - ): - ret = set(sess.query(User).select_from(from_obj)._get_joinable_tables()) - self.assertEquals(ret, set(assert_cond).union([from_obj]), [x.description for x in ret]) - def test_overlapping_paths(self): for aliased in (True,False): # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack) @@ -654,7 +695,34 @@ class JoinTest(QueryTest): def test_orderby_arg_bug(self): sess = create_session() + # no arg error + result = sess.query(User).join('orders', aliased=True).order_by([Order.id]).reset_joinpoint().order_by(users.c.id).all() + + def test_no_onclause(self): + sess = create_session() + + self.assertEquals( + sess.query(User).select_from(join(User, Order).join(Item, Order.items)).filter(Item.description == 'item 4').all(), + [User(name='jack')] + ) + + self.assertEquals( + sess.query(User).join(Order, (Item, Order.items)).filter(Item.description == 'item 4').all(), + [User(name='jack')] + ) + def test_clause_onclause(self): + sess = create_session() + + self.assertEquals( + sess.query(User).join( + (Order, User.id==Order.user_id), + (order_items, Order.id==order_items.c.order_id), + (Item, order_items.c.item_id==Item.id) + ).filter(Item.description == 'item 4').all(), + [User(name='jack')] + ) + # no arg error result = sess.query(User).join('orders', aliased=True).order_by([Order.id]).reset_joinpoint().order_by(users.c.id).all() @@ -682,13 +750,43 @@ class JoinTest(QueryTest): l = q.select_from(outerjoin(User, AdAlias)).filter(AdAlias.email_address=='ed@bettyboop.com').all() self.assertEquals(l, [(user8, address3)]) - l = q.select_from(outerjoin(User, AdAlias, 'addresses')).filter(AdAlias.email_address=='ed@bettyboop.com').all() self.assertEquals(l, [(user8, address3)]) l = q.select_from(outerjoin(User, AdAlias, User.id==AdAlias.user_id)).filter(AdAlias.email_address=='ed@bettyboop.com').all() self.assertEquals(l, [(user8, address3)]) + # this is the first test where we are joining "backwards" - from AdAlias to User even though + # the query is against User + q = sess.query(User, AdAlias) + l = q.join(AdAlias.user).filter(User.name=='ed') + self.assertEquals(l.all(), [(user8, address2),(user8, address3),(user8, address4),]) + + q = sess.query(User, AdAlias).select_from(join(AdAlias, User, AdAlias.user)).filter(User.name=='ed') + self.assertEquals(l.all(), [(user8, address2),(user8, address3),(user8, address4),]) + + def test_implicit_joins_from_aliases(self): + sess = create_session() + OrderAlias = aliased(Order) + + self.assertEquals( + sess.query(OrderAlias).join('items').filter_by(description='item 3').all(), + [ + Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1), + Order(address_id=4,description=u'order 2',isopen=0,user_id=9,id=2), + Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3) + ] + ) + + self.assertEquals( + sess.query(User, OrderAlias, Item.description).join(('orders', OrderAlias), 'items').filter_by(description='item 3').all(), + [ + (User(name=u'jack',id=7), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1), u'item 3'), + (User(name=u'jack',id=7), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3), u'item 3'), + (User(name=u'fred',id=9), Order(address_id=4,description=u'order 2',isopen=0,user_id=9,id=2), u'item 3') + ] + ) + def test_aliased_classes_m2m(self): sess = create_session() @@ -725,20 +823,6 @@ class JoinTest(QueryTest): ] ) - def test_generative_join(self): - # test that alised_ids is copied - sess = create_session() - q = sess.query(User).add_entity(Address) - q1 = q.join('addresses', aliased=True) - q2 = q.join('addresses', aliased=True) - q3 = q2.join('addresses', aliased=True) - q4 = q2.join('addresses', aliased=True, id='someid') - q5 = q2.join('addresses', aliased=True, id='someid') - q6 = q5.join('addresses', aliased=True, id='someid') - assert q1._alias_ids[class_mapper(Address)] != q2._alias_ids[class_mapper(Address)] - assert q2._alias_ids[class_mapper(Address)] != q3._alias_ids[class_mapper(Address)] - assert q4._alias_ids['someid'] != q5._alias_ids['someid'] - def test_reset_joinpoint(self): for aliased in (True, False): # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack) @@ -779,43 +863,19 @@ class JoinTest(QueryTest): assert q.count() == 1 assert [User(id=7)] == q.all() + # test the control version - same joins but not aliased. rows are not returned because order 3 does not have item 1 - # addtionally by placing this test after the previous one, test that the "aliasing" step does not corrupt the - # join clauses that are cached by the relationship. - q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Order.description=="item 1") + q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Item.description=="item 1") assert [] == q.all() assert q.count() == 0 q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4')) assert [User(id=7)] == q.all() - - def test_aliased_add_entity(self): - """test the usage of aliased joins with add_entity()""" - sess = create_session() - q = sess.query(User).join('orders', aliased=True, id='order1').filter(Order.description=="order 3").join(['orders', 'items'], aliased=True, id='item1').filter(Item.description=="item 1") - - try: - q.add_entity(Order, id='fakeid').compile() - assert False - except exceptions.InvalidRequestError, e: - assert str(e) == "Query has no alias identified by 'fakeid'" - - try: - q.add_entity(Order, id='fakeid').instances(None) - assert False - except exceptions.InvalidRequestError, e: - assert str(e) == "Query has no alias identified by 'fakeid'" - - q = q.add_entity(Order, id='order1').add_entity(Item, id='item1') + + # test that aliasing gets reset when join() is called + q = sess.query(User).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', aliased=True).filter(Order.description=="order 5") assert q.count() == 1 - assert [(User(id=7), Order(description='order 3'), Item(description='item 1'))] == q.all() - - q = sess.query(User).add_entity(Order).join('orders', aliased=True).filter(Order.description=="order 3").join('orders', aliased=True).filter(Order.description=='order 4') - try: - q.compile() - assert False - except exceptions.InvalidRequestError, e: - assert str(e) == "Ambiguous join for entity 'Mapper|Order|orders'; specify id=<someid> to query.join()/query.add_entity()" + assert [User(id=7)] == q.all() class MultiplePathTest(ORMTest): def define_tables(self, metadata): @@ -849,11 +909,10 @@ class MultiplePathTest(ORMTest): }) mapper(T2, t2) - try: - create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint().join('t2s_2') - assert False - except exceptions.InvalidRequestError, e: - assert str(e) == "Can't join to property 't2s_2'; a path to this table along a different secondary table already exists. Use the `alias=True` argument to `join()`." + q = create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint() + self.assertRaisesMessage(sa_exc.InvalidRequestError, "a path to this table along a different secondary table already exists.", + q.join, 't2s_2' + ) create_session().query(T1).join('t2s_1', aliased=True).filter(t2.c.id==5).reset_joinpoint().join('t2s_2').all() create_session().query(T1).join('t2s_1').filter(t2.c.id==5).reset_joinpoint().join('t2s_2', aliased=True).all() @@ -926,26 +985,34 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): assert fixtures.user_address_result == l self.assert_sql_count(testing.db, go, 1) + # better way. use select_from() + def go(): + l = sess.query(User).select_from(query).options(contains_eager('addresses')).all() + assert fixtures.user_address_result == l + self.assert_sql_count(testing.db, go, 1) + def test_contains_eager(self): sess = create_session() + # test that contains_eager suppresses the normal outer join rendering q = sess.query(User).outerjoin(User.addresses).options(contains_eager(User.addresses)) - self.assert_compile(q.statement, "SELECT addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, " - "addresses.email_address AS addresses_email_address, users.id AS users_id, users.name AS users_name "\ - "FROM users LEFT OUTER JOIN addresses ON users.id = addresses.user_id ORDER BY users.id", dialect=default.DefaultDialect()) - + self.assert_compile(q.with_labels().statement, "SELECT users.id AS users_id, users.name AS users_name, "\ + "addresses.id AS addresses_id, addresses.user_id AS addresses_user_id, "\ + "addresses.email_address AS addresses_email_address FROM users LEFT OUTER JOIN addresses "\ + "ON users.id = addresses.user_id ORDER BY users.id", dialect=default.DefaultDialect()) + def go(): assert fixtures.user_address_result == q.all() self.assert_sql_count(testing.db, go, 1) sess.clear() - + adalias = addresses.alias() q = sess.query(User).select_from(users.outerjoin(adalias)).options(contains_eager(User.addresses, alias=adalias)) def go(): assert fixtures.user_address_result == q.all() self.assert_sql_count(testing.db, go, 1) sess.clear() - + selectquery = users.outerjoin(addresses).select(users.c.id<10, use_labels=True, order_by=[users.c.id, addresses.c.id]) q = sess.query(User) @@ -956,6 +1023,13 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): sess.clear() + + def go(): + l = q.options(contains_eager(User.addresses)).instances(selectquery.execute()) + assert fixtures.user_address_result[0:3] == l + self.assert_sql_count(testing.db, go, 1) + sess.clear() + def go(): l = q.options(contains_eager('addresses')).from_statement(selectquery).all() assert fixtures.user_address_result[0:3] == l @@ -966,38 +1040,34 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): selectquery = users.outerjoin(adalias).select(use_labels=True, order_by=[users.c.id, adalias.c.id]) sess = create_session() q = sess.query(User) - + + # string alias name def go(): - # test using a string alias name l = q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute()) assert fixtures.user_address_result == l self.assert_sql_count(testing.db, go, 1) sess.clear() + # expression.Alias object def go(): - # test using the Alias object itself l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute()) assert fixtures.user_address_result == l self.assert_sql_count(testing.db, go, 1) sess.clear() - def decorate(row): - d = {} - for c in addresses.c: - d[c] = row[adalias.corresponding_column(c)] - return d - + # Aliased object + adalias = aliased(Address) def go(): - # test using a custom 'decorate' function - l = q.options(contains_eager('addresses', decorator=decorate)).instances(selectquery.execute()) - assert fixtures.user_address_result == l + l = q.options(contains_eager('addresses', alias=adalias)).outerjoin((adalias, User.addresses)).order_by(User.id, adalias.id) + assert fixtures.user_address_result == l.all() self.assert_sql_count(testing.db, go, 1) sess.clear() + oalias = orders.alias('o1') ialias = items.alias('i1') - query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id).order_by(oalias.c.id).order_by(ialias.c.id) + query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id, oalias.c.id, ialias.c.id) q = create_session().query(User) # test using string alias with more than one level deep def go(): @@ -1014,9 +1084,24 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.assert_sql_count(testing.db, go, 1) sess.clear() + # test using Aliased with more than one level deep + oalias = aliased(Order) + ialias = aliased(Item) + def go(): + l = q.options(contains_eager(User.orders, alias=oalias), contains_eager(User.orders, Order.items, alias=ialias)).\ + outerjoin((oalias, User.orders), (ialias, Order.items)).order_by(User.id, oalias.id, ialias.id) + assert fixtures.user_order_result == l.all() + self.assert_sql_count(testing.db, go, 1) + sess.clear() + + +class MixedEntitiesTest(QueryTest): + def test_values(self): sess = create_session() + assert list(sess.query(User).values()) == list() + sel = users.select(User.id.in_([7, 8])).alias() q = sess.query(User) q2 = q.select_from(sel).values(User.name) @@ -1035,19 +1120,166 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(desc(Address.email_address))[1:3].values(User.name, Address.email_address) self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@lala.com')]) - q2 = q.join('addresses', aliased=True).filter(User.name.like('%e%')).values(User.name, Address.email_address) + adalias = aliased(Address) + q2 = q.join(('addresses', adalias)).filter(User.name.like('%e%')).values(User.name, adalias.email_address) self.assertEquals(list(q2), [(u'ed', u'ed@wood.com'), (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), (u'fred', u'fred@fred.com')]) q2 = q.values(func.count(User.name)) assert q2.next() == (4,) - u2 = users.alias() - q2 = q.select_from(sel).filter(u2.c.id>1).order_by([users.c.id, sel.c.id, u2.c.id]).values(users.c.name, sel.c.name, u2.c.name) + u2 = aliased(User) + q2 = q.select_from(sel).filter(u2.id>1).order_by([User.id, sel.c.id, u2.id]).values(User.name, sel.c.name, u2.name) self.assertEquals(list(q2), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')]) - q2 = q.select_from(sel).filter(users.c.id>1).values(users.c.name, sel.c.name, User.name) - self.assertEquals(list(q2), [(u'jack', u'jack', u'jack'), (u'ed', u'ed', u'ed')]) + q2 = q.select_from(sel).filter(User.id==8).values(User.name, sel.c.name, User.name) + self.assertEquals(list(q2), [(u'ed', u'ed', u'ed')]) + + # using User.xxx is alised against "sel", so this query returns nothing + q2 = q.select_from(sel).filter(User.id==8).filter(User.id>sel.c.id).values(User.name, sel.c.name, User.name) + self.assertEquals(list(q2), []) + + # whereas this uses users.c.xxx, is not aliased and creates a new join + q2 = q.select_from(sel).filter(users.c.id==8).filter(users.c.id>sel.c.id).values(users.c.name, sel.c.name, User.name) + self.assertEquals(list(q2), [(u'ed', u'jack', u'jack')]) + def test_tuple_labeling(self): + sess = create_session() + for row in sess.query(User, Address).join(User.addresses).all(): + self.assertEquals(set(row.keys()), set(['User', 'Address'])) + self.assertEquals(row.User, row[0]) + self.assertEquals(row.Address, row[1]) + + for row in sess.query(User.name, User.id.label('foobar')): + self.assertEquals(set(row.keys()), set(['name', 'foobar'])) + self.assertEquals(row.name, row[0]) + self.assertEquals(row.foobar, row[1]) + + for row in sess.query(User).values(User.name, User.id.label('foobar')): + self.assertEquals(set(row.keys()), set(['name', 'foobar'])) + self.assertEquals(row.name, row[0]) + self.assertEquals(row.foobar, row[1]) + + oalias = aliased(Order) + for row in sess.query(User, oalias).join(User.orders).all(): + self.assertEquals(set(row.keys()), set(['User'])) + self.assertEquals(row.User, row[0]) + + oalias = aliased(Order, name='orders') + for row in sess.query(User, oalias).join(User.orders).all(): + self.assertEquals(set(row.keys()), set(['User', 'orders'])) + self.assertEquals(row.User, row[0]) + self.assertEquals(row.orders, row[1]) + + + def test_column_queries(self): + sess = create_session() + + self.assertEquals(sess.query(User.name).all(), [(u'jack',), (u'ed',), (u'fred',), (u'chuck',)]) + + sel = users.select(User.id.in_([7, 8])).alias() + q = sess.query(User.name) + q2 = q.select_from(sel).all() + self.assertEquals(list(q2), [(u'jack',), (u'ed',)]) + + self.assertEquals(sess.query(User.name, Address.email_address).filter(User.id==Address.user_id).all(), [ + (u'jack', u'jack@bean.com'), (u'ed', u'ed@wood.com'), + (u'ed', u'ed@bettyboop.com'), (u'ed', u'ed@lala.com'), + (u'fred', u'fred@fred.com') + ]) + + self.assertEquals(sess.query(User.name, func.count(Address.email_address)).outerjoin(User.addresses).group_by(User.id, User.name).order_by(User.id).all(), + [(u'jack', 1), (u'ed', 3), (u'fred', 1), (u'chuck', 0)] + ) + + self.assertEquals(sess.query(User, func.count(Address.email_address)).outerjoin(User.addresses).group_by(User).order_by(User.id).all(), + [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)] + ) + + self.assertEquals(sess.query(func.count(Address.email_address), User).outerjoin(User.addresses).group_by(User).order_by(User.id).all(), + [(1, User(name='jack',id=7)), (3, User(name='ed',id=8)), (1, User(name='fred',id=9)), (0, User(name='chuck',id=10))] + ) + + adalias = aliased(Address) + self.assertEquals(sess.query(User, func.count(adalias.email_address)).outerjoin(('addresses', adalias)).group_by(User).order_by(User.id).all(), + [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)] + ) + + self.assertEquals(sess.query(func.count(adalias.email_address), User).outerjoin((User.addresses, adalias)).group_by(User).order_by(User.id).all(), + [(1, User(name=u'jack',id=7)), (3, User(name=u'ed',id=8)), (1, User(name=u'fred',id=9)), (0, User(name=u'chuck',id=10))] + ) + + # select from aliasing + explicit aliasing + self.assertEquals( + sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).order_by(User.id, adalias.id).all(), + [ + (User(name=u'jack',id=7), u'jack@bean.com'), + (User(name=u'ed',id=8), u'ed@wood.com'), + (User(name=u'ed',id=8), u'ed@bettyboop.com'), + (User(name=u'ed',id=8), u'ed@lala.com'), + (User(name=u'fred',id=9), u'fred@fred.com'), + (User(name=u'chuck',id=10), None) + ] + ) + + # anon + select from aliasing + self.assertEquals( + sess.query(User).join(User.addresses, aliased=True).filter(Address.email_address.like('%ed%')).from_self().all(), + [ + User(name=u'ed',id=8), + User(name=u'fred',id=9), + ] + ) + + # test eager aliasing, with/without select_from aliasing + for q in [ + sess.query(User, adalias.email_address).outerjoin((User.addresses, adalias)).options(eagerload(User.addresses)).order_by(User.id, adalias.id).limit(10), + sess.query(User, adalias.email_address, adalias.id).outerjoin((User.addresses, adalias)).from_self(User, adalias.email_address).options(eagerload(User.addresses)).order_by(User.id, adalias.id).limit(10), + ]: + self.assertEquals( + q.all(), + [(User(addresses=[Address(user_id=7,email_address=u'jack@bean.com',id=1)],name=u'jack',id=7), u'jack@bean.com'), + (User(addresses=[ + Address(user_id=8,email_address=u'ed@wood.com',id=2), + Address(user_id=8,email_address=u'ed@bettyboop.com',id=3), + Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@wood.com'), + (User(addresses=[ + Address(user_id=8,email_address=u'ed@wood.com',id=2), + Address(user_id=8,email_address=u'ed@bettyboop.com',id=3), + Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@bettyboop.com'), + (User(addresses=[ + Address(user_id=8,email_address=u'ed@wood.com',id=2), + Address(user_id=8,email_address=u'ed@bettyboop.com',id=3), + Address(user_id=8,email_address=u'ed@lala.com',id=4)],name=u'ed',id=8), u'ed@lala.com'), + (User(addresses=[Address(user_id=9,email_address=u'fred@fred.com',id=5)],name=u'fred',id=9), u'fred@fred.com'), + + (User(addresses=[],name=u'chuck',id=10), None)] + ) + + def test_self_referential(self): + + sess = create_session() + oalias = aliased(Order) + + for q in [ + sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id), + sess.query(Order, oalias)._from_self().filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id), + # here we go....two layers of aliasing + sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)), + + # gratuitous four layers + sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).filter(Order.id>oalias.id)._from_self()._from_self()._from_self().order_by(Order.id, oalias.id).limit(10).options(eagerload(Order.items)), + + ]: + + self.assertEquals( + q.all(), + [ + (Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1)), + (Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 1',isopen=0,user_id=7,id=1)), + (Order(address_id=None,description=u'order 5',isopen=0,user_id=7,id=5), Order(address_id=1,description=u'order 3',isopen=1,user_id=7,id=3)) + ] + ) + def test_multi_mappers(self): test_session = create_session() @@ -1055,7 +1287,6 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): (user7, user8, user9, user10) = test_session.query(User).all() (address1, address2, address3, address4, address5) = test_session.query(Address).all() - # note the result is a cartesian product expected = [(user7, address1), (user8, address2), (user8, address3), @@ -1066,30 +1297,24 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): sess = create_session() selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id]) - q = sess.query(User) - l = q.instances(selectquery.execute(), Address) - assert l == expected - + self.assertEquals(sess.query(User, Address).instances(selectquery.execute()), expected) sess.clear() - for aliased in (False, True): - q = sess.query(User) - - q = q.add_entity(Address).outerjoin('addresses', aliased=aliased) - l = q.all() - assert l == expected + for address_entity in (Address, aliased(Address)): + q = sess.query(User).add_entity(address_entity).outerjoin(('addresses', address_entity)).order_by(User.id, address_entity.id) + self.assertEquals(q.all(), expected) sess.clear() - q = sess.query(User).add_entity(Address) - l = q.join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com').all() - assert l == [(user8, address3)] + q = sess.query(User).add_entity(address_entity) + q = q.join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com') + self.assertEquals(q.all(), [(user8, address3)]) sess.clear() - q = sess.query(User, Address).join('addresses', aliased=aliased).filter_by(email_address='ed@bettyboop.com') - assert q.all() == [(user8, address3)] + q = sess.query(User, address_entity).join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com') + self.assertEquals(q.all(), [(user8, address3)]) sess.clear() - q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com') + q = sess.query(User, address_entity).join(('addresses', address_entity)).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com') self.assertEquals(list(util.OrderedSet(q.all())), [(user8, address3)]) sess.clear() @@ -1123,18 +1348,12 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): expected = [(u, u.name) for u in sess.query(User).all()] - for add_col in (User.name, users.c.name, User.c.name): + for add_col in (User.name, users.c.name): assert sess.query(User).add_column(add_col).all() == expected sess.clear() - self.assertRaises(exceptions.InvalidRequestError, sess.query(User).add_column, object()) + self.assertRaises(sa_exc.InvalidRequestError, sess.query(User).add_column, object()) - def test_ambiguous_column(self): - sess = create_session() - - q = sess.query(User).join('addresses', aliased=True).join('addresses', aliased=True).add_column(Address.id) - self.assertRaises(exceptions.InvalidRequestError, iter, q) - def test_multi_columns_2(self): """test aliased/nonalised joins with the usage of add_column()""" sess = create_session() @@ -1146,12 +1365,16 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): (user10, 0) ] - for aliased in (False, True): - q = sess.query(User) - q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses', aliased=aliased).add_column(func.count(Address.id).label('count')) - l = q.all() - assert l == expected - sess.clear() + q = sess.query(User) + q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses').add_column(func.count(Address.id).label('count')) + self.assertEquals(q.all(), expected) + sess.clear() + + adalias = aliased(Address) + q = sess.query(User) + q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin(('addresses', adalias)).add_column(func.count(adalias.id).label('count')) + self.assertEquals(q.all(), expected) + sess.clear() s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id) q = sess.query(User) @@ -1159,7 +1382,7 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): assert l == expected - def test_two_columns(self): + def test_raw_columns(self): sess = create_session() (user7, user8, user9, user10) = sess.query(User).all() expected = [ @@ -1168,8 +1391,9 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): (user9, 1, "Name:fred"), (user10, 0, "Name:chuck")] - q = create_session().query(User).add_column(func.count(addresses.c.id))\ - .add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=True)\ + adalias = addresses.alias() + q = create_session().query(User).add_column(func.count(adalias.c.id))\ + .add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\ .group_by([c for c in users.c]).order_by(users.c.id) assert q.all() == expected @@ -1190,14 +1414,19 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): assert q.all() == expected sess.clear() - # test with outerjoin() both aliased and non - for aliased in (False, True): - q = create_session().query(User).add_column(func.count(addresses.c.id))\ - .add_column(("Name:" + users.c.name)).outerjoin('addresses', aliased=aliased)\ - .group_by([c for c in users.c]).order_by(users.c.id) + q = create_session().query(User).add_column(func.count(addresses.c.id))\ + .add_column(("Name:" + users.c.name)).outerjoin('addresses')\ + .group_by([c for c in users.c]).order_by(users.c.id) - assert q.all() == expected - sess.clear() + assert q.all() == expected + sess.clear() + + q = create_session().query(User).add_column(func.count(adalias.c.id))\ + .add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\ + .group_by([c for c in users.c]).order_by(users.c.id) + + assert q.all() == expected + sess.clear() class SelectFromTest(QueryTest): @@ -1217,7 +1446,7 @@ class SelectFromTest(QueryTest): self.assertEquals(sess.query(User).select_from(sel).all(), [User(id=7), User(id=8)]) - self.assertEquals(sess.query(User).select_from(sel).filter(User.c.id==8).all(), [User(id=8)]) + self.assertEquals(sess.query(User).select_from(sel).filter(User.id==8).all(), [User(id=8)]) self.assertEquals(sess.query(User).select_from(sel).order_by(desc(User.name)).all(), [ User(name='jack',id=7), User(name='ed',id=8) @@ -1273,7 +1502,8 @@ class SelectFromTest(QueryTest): ] ) - self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(), + adalias = aliased(Address) + self.assertEquals(sess.query(User).select_from(sel).join(('addresses', adalias)).add_entity(adalias).order_by(User.id).order_by(adalias.id).all(), [ (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), @@ -1297,12 +1527,15 @@ class SelectFromTest(QueryTest): sel = users.select(users.c.id.in_([7, 8])) sess = create_session() + + # TODO: remove + sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join('orders', 'items', 'keywords', aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all() - self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords']).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [ + self.assertEquals(sess.query(User).select_from(sel).join('orders', 'items', 'keywords').filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [ User(name=u'jack',id=7) ]) - self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords'], aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [ + self.assertEquals(sess.query(User).select_from(sel).join('orders', 'items', 'keywords', aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [ User(name=u'jack',id=7) ]) @@ -1355,7 +1588,7 @@ class SelectFromTest(QueryTest): sess.clear() def go(): - self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(), + self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.id==8).all(), [User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])] ) self.assert_sql_count(testing.db, go, 1) @@ -1364,7 +1597,7 @@ class SelectFromTest(QueryTest): def go(): self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])) self.assert_sql_count(testing.db, go, 1) - + class CustomJoinTest(QueryTest): keep_mappers = False @@ -1428,6 +1661,10 @@ class SelfReferentialTest(ORMTest): node = sess.query(Node).join('children', aliased=True).filter_by(data='n122').first() assert node.data=='n12' + ret = sess.query(Node.data).join(Node.children, aliased=True).filter_by(data='n122').all() + assert ret == [('n12',)] + + node = sess.query(Node).join(['children', 'children'], aliased=True).filter_by(data='n122').first() assert node.data=='n1' @@ -1461,10 +1698,66 @@ class SelfReferentialTest(ORMTest): list(sess.query(Node).select_from(join(Node, n1, 'parent').join(n2, 'parent')).\ filter(and_(Node.data=='n122', n1.data=='n12', n2.data=='n1')).values(Node.data, n1.data, n2.data)), [('n122', 'n12', 'n1')]) + + def test_join_to_nonaliased(self): + sess = create_session() - def test_any(self): + n1 = aliased(Node) + + # using 'n1.parent' implicitly joins to unaliased Node + self.assertEquals( + sess.query(n1).join(n1.parent).filter(Node.data=='n1').all(), + [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)] + ) + + # explicit (new syntax) + self.assertEquals( + sess.query(n1).join((Node, n1.parent)).filter(Node.data=='n1').all(), + [Node(parent_id=1,data=u'n11',id=2), Node(parent_id=1,data=u'n12',id=3), Node(parent_id=1,data=u'n13',id=4)] + ) + + def test_multiple_explicit_entities(self): sess = create_session() + parent = aliased(Node) + grandparent = aliased(Node) + self.assertEquals( + sess.query(Node, parent, grandparent).\ + join((Node.parent, parent), (parent.parent, grandparent)).\ + filter(Node.data=='n122').filter(parent.data=='n12').\ + filter(grandparent.data=='n1').first(), + (Node(data='n122'), Node(data='n12'), Node(data='n1')) + ) + + self.assertEquals( + sess.query(Node, parent, grandparent).\ + join((Node.parent, parent), (parent.parent, grandparent)).\ + filter(Node.data=='n122').filter(parent.data=='n12').\ + filter(grandparent.data=='n1')._from_self().first(), + (Node(data='n122'), Node(data='n12'), Node(data='n1')) + ) + + self.assertEquals( + sess.query(Node, parent, grandparent).\ + join((Node.parent, parent), (parent.parent, grandparent)).\ + filter(Node.data=='n122').filter(parent.data=='n12').\ + filter(grandparent.data=='n1').\ + options(eagerload(Node.children)).first(), + (Node(data='n122'), Node(data='n12'), Node(data='n1')) + ) + + self.assertEquals( + sess.query(Node, parent, grandparent).\ + join((Node.parent, parent), (parent.parent, grandparent)).\ + filter(Node.data=='n122').filter(parent.data=='n12').\ + filter(grandparent.data=='n1')._from_self().\ + options(eagerload(Node.children)).first(), + (Node(data='n122'), Node(data='n12'), Node(data='n1')) + ) + + + def test_any(self): + sess = create_session() self.assertEquals(sess.query(Node).filter(Node.children.any(Node.data=='n1')).all(), []) self.assertEquals(sess.query(Node).filter(Node.children.any(Node.data=='n12')).all(), [Node(data='n1')]) self.assertEquals(sess.query(Node).filter(~Node.children.any()).all(), [Node(data='n11'), Node(data='n13'),Node(data='n121'),Node(data='n122'),Node(data='n123'),]) @@ -1561,6 +1854,8 @@ class SelfReferentialM2MTest(ORMTest): ) class ExternalColumnsTest(QueryTest): + """test mappers with SQL-expressions added as column properties.""" + keep_mappers = False def setup_mappers(self): @@ -1568,15 +1863,11 @@ class ExternalColumnsTest(QueryTest): def test_external_columns_bad(self): - self.assertRaisesMessage(exceptions.ArgumentError, "not represented in mapper's table", mapper, User, users, properties={ + self.assertRaisesMessage(sa_exc.ArgumentError, "not represented in mapper's table", mapper, User, users, properties={ 'concat': (users.c.id * 2), }) clear_mappers() - self.assertRaisesMessage(exceptions.ArgumentError, "must be given a ColumnElement as its argument.", column_property, - select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users) - ) - def test_external_columns_good(self): """test querying mappings that reference external columns or selectables.""" @@ -1586,19 +1877,21 @@ class ExternalColumnsTest(QueryTest): }) mapper(Address, addresses, properties={ - 'user':relation(User, lazy=True) + 'user':relation(User) }) sess = create_session() - - l = sess.query(User).all() - assert [ - User(id=7, concat=14, count=1), - User(id=8, concat=16, count=3), - User(id=9, concat=18, count=1), - User(id=10, concat=20, count=0), - ] == l + sess.query(Address).options(eagerload('user')).all() + + self.assertEquals(sess.query(User).all(), + [ + User(id=7, concat=14, count=1), + User(id=8, concat=16, count=3), + User(id=9, concat=18, count=1), + User(id=10, concat=20, count=0), + ] + ) address_result = [ Address(id=1, user=User(id=7, concat=14, count=1)), @@ -1617,15 +1910,24 @@ class ExternalColumnsTest(QueryTest): self.assertEquals(sess.query(Address).options(eagerload('user')).all(), address_result) self.assert_sql_count(testing.db, go, 1) - tuple_address_result = [(address, address.user) for address in address_result] - - q =sess.query(Address).join('user', aliased=True, id='ualias').join('user', aliased=True).add_column(User.concat) - self.assertRaisesMessage(exceptions.InvalidRequestError, "Ambiguous", q.all) - - self.assertEquals(sess.query(Address).join('user', aliased=True, id='ualias').add_entity(User, id='ualias').all(), tuple_address_result) + ualias = aliased(User) + self.assertEquals( + sess.query(Address, ualias).join(('user', ualias)).all(), + [(address, address.user) for address in address_result] + ) - self.assertEquals(sess.query(Address).join('user', aliased=True, id='ualias').join('user', aliased=True).\ - add_column(User.concat, id='ualias').add_column(User.count, id='ualias').all(), + self.assertEquals( + sess.query(Address, ualias.count).join(('user', ualias)).join('user', aliased=True).all(), + [ + (Address(id=1), 1), + (Address(id=2), 3), + (Address(id=3), 3), + (Address(id=4), 3), + (Address(id=5), 1) + ] + ) + + self.assertEquals(sess.query(Address, ualias.concat, ualias.count).join(('user', ualias)).join('user', aliased=True).all(), [ (Address(id=1), 14, 1), (Address(id=2), 16, 3), @@ -1635,15 +1937,21 @@ class ExternalColumnsTest(QueryTest): ] ) - self.assertEquals(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)), - [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)] + ua = aliased(User) + self.assertEquals(sess.query(Address, ua.concat, ua.count).select_from(join(Address, ua, 'user')).options(eagerload(Address.user)).all(), + [ + (Address(id=1, user=User(id=7, concat=14, count=1)), 14, 1), + (Address(id=2, user=User(id=8, concat=16, count=3)), 16, 3), + (Address(id=3, user=User(id=8, concat=16, count=3)), 16, 3), + (Address(id=4, user=User(id=8, concat=16, count=3)), 16, 3), + (Address(id=5, user=User(id=9, concat=18, count=1)), 18, 1) + ] ) - self.assertEquals(list(sess.query(Address).join('user', aliased=True).values(Address.id, User.id, User.concat, User.count)), + self.assertEquals(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)), [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)] ) - ua = aliased(User) self.assertEquals(list(sess.query(Address, ua).select_from(join(Address,ua, 'user')).values(Address.id, ua.id, ua.concat, ua.count)), [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)] ) |
