summaryrefslogtreecommitdiff
path: root/test/sql/test_query.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
commit8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch)
treeae9e27d12c9fbf8297bb90469509e1cb6a206242 /test/sql/test_query.py
parent7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff)
downloadsqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz
merge 0.6 series to trunk.
Diffstat (limited to 'test/sql/test_query.py')
-rw-r--r--test/sql/test_query.py331
1 files changed, 215 insertions, 116 deletions
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 51b933e45..0e3b9dff2 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -1,9 +1,11 @@
+from sqlalchemy.test.testing import eq_
import datetime
from sqlalchemy import *
from sqlalchemy import exc, sql
from sqlalchemy.engine import default
from sqlalchemy.test import *
-from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.testing import eq_, assert_raises_message
+from sqlalchemy.test.schema import Table, Column
class QueryTest(TestBase):
@@ -12,11 +14,11 @@ class QueryTest(TestBase):
global users, users2, addresses, metadata
metadata = MetaData(testing.db)
users = Table('query_users', metadata,
- Column('user_id', INT, primary_key = True),
+ Column('user_id', INT, primary_key=True, test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
addresses = Table('query_addresses', metadata,
- Column('address_id', Integer, primary_key=True),
+ Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('query_users.user_id')),
Column('address', String(30)))
@@ -26,7 +28,8 @@ class QueryTest(TestBase):
)
metadata.create_all()
- def tearDown(self):
+ @engines.close_first
+ def teardown(self):
addresses.delete().execute()
users.delete().execute()
users2.delete().execute()
@@ -52,89 +55,133 @@ class QueryTest(TestBase):
assert users.count().scalar() == 1
users.update(users.c.user_id == 7).execute(user_name = 'fred')
- assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
+ assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred'
def test_lastrow_accessor(self):
- """Tests the last_inserted_ids() and lastrow_has_id() functions."""
+ """Tests the inserted_primary_key and lastrow_has_id() functions."""
- def insert_values(table, values):
+ def insert_values(engine, table, values):
"""
Inserts a row into a table, returns the full list of values
INSERTed including defaults that fired off on the DB side and
detects rows that had defaults and post-fetches.
"""
- result = table.insert().execute(**values)
+ result = engine.execute(table.insert(), **values)
ret = values.copy()
- for col, id in zip(table.primary_key, result.last_inserted_ids()):
+ for col, id in zip(table.primary_key, result.inserted_primary_key):
ret[col.key] = id
if result.lastrow_has_defaults():
- criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])
- row = table.select(criterion).execute().fetchone()
+ criterion = and_(*[col==id for col, id in zip(table.primary_key, result.inserted_primary_key)])
+ row = engine.execute(table.select(criterion)).first()
for c in table.c:
ret[c.key] = row[c]
return ret
- for supported, table, values, assertvalues in [
- (
- {'unsupported':['sqlite']},
- Table("t1", metadata,
- Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True),
- Column('foo', String(30), primary_key=True)),
- {'foo':'hi'},
- {'id':1, 'foo':'hi'}
- ),
- (
- {'unsupported':['sqlite']},
- Table("t2", metadata,
- Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
+ if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ test_engines = [
+ engines.testing_engine(options={'implicit_returning':False}),
+ engines.testing_engine(options={'implicit_returning':True}),
+ ]
+ else:
+ test_engines = [testing.db]
+
+ for engine in test_engines:
+ metadata = MetaData()
+ for supported, table, values, assertvalues in [
+ (
+ {'unsupported':['sqlite']},
+ Table("t1", metadata,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('foo', String(30), primary_key=True)),
+ {'foo':'hi'},
+ {'id':1, 'foo':'hi'}
),
- {'foo':'hi'},
- {'id':1, 'foo':'hi', 'bar':'hi'}
- ),
- (
- {'unsupported':[]},
- Table("t3", metadata,
- Column("id", String(40), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column("bar", String(30))
+ (
+ {'unsupported':['sqlite']},
+ Table("t2", metadata,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('foo', String(30), primary_key=True),
+ Column('bar', String(30), server_default='hi')
),
- {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},
- {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}
- ),
- (
- {'unsupported':[]},
- Table("t4", metadata,
- Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
+ {'foo':'hi'},
+ {'id':1, 'foo':'hi', 'bar':'hi'}
),
- {'foo':'hi', 'id':1},
- {'id':1, 'foo':'hi', 'bar':'hi'}
- ),
- (
- {'unsupported':[]},
- Table("t5", metadata,
- Column('id', String(10), primary_key=True),
- Column('bar', String(30), server_default='hi')
+ (
+ {'unsupported':[]},
+ Table("t3", metadata,
+ Column("id", String(40), primary_key=True),
+ Column('foo', String(30), primary_key=True),
+ Column("bar", String(30))
+ ),
+ {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},
+ {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}
),
- {'id':'id1'},
- {'id':'id1', 'bar':'hi'},
- ),
- ]:
- if testing.db.name in supported['unsupported']:
- continue
- try:
- table.create()
- i = insert_values(table, values)
- assert i == assertvalues, repr(i) + " " + repr(assertvalues)
- finally:
- table.drop()
+ (
+ {'unsupported':[]},
+ Table("t4", metadata,
+ Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
+ Column('foo', String(30), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'foo':'hi', 'id':1},
+ {'id':1, 'foo':'hi', 'bar':'hi'}
+ ),
+ (
+ {'unsupported':[]},
+ Table("t5", metadata,
+ Column('id', String(10), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'id':'id1'},
+ {'id':'id1', 'bar':'hi'},
+ ),
+ ]:
+ if testing.db.name in supported['unsupported']:
+ continue
+ try:
+ table.create(bind=engine, checkfirst=True)
+ i = insert_values(engine, table, values)
+ assert i == assertvalues, "tablename: %s %r %r" % (table.name, repr(i), repr(assertvalues))
+ finally:
+ table.drop(bind=engine)
+
+ @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks")
+ def test_misordered_lastrow(self):
+ related = Table('related', metadata,
+ Column('id', Integer, primary_key=True)
+ )
+ t6 = Table("t6", metadata,
+ Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True),
+ Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ )
+ metadata.create_all()
+ r = related.insert().values(id=12).execute()
+ id = r.inserted_primary_key[0]
+ assert id==12
+
+ r = t6.insert().values(manual_id=id).execute()
+ eq_(r.inserted_primary_key, [12, 1])
+
+ def test_autoclose_on_insert(self):
+ if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ test_engines = [
+ engines.testing_engine(options={'implicit_returning':False}),
+ engines.testing_engine(options={'implicit_returning':True}),
+ ]
+ else:
+ test_engines = [testing.db]
+
+ for engine in test_engines:
+
+ r = engine.execute(users.insert(),
+ {'user_name':'jack'},
+ )
+ assert r.closed
+
def test_row_iteration(self):
users.insert().execute(
{'user_id':7, 'user_name':'jack'},
@@ -147,7 +194,7 @@ class QueryTest(TestBase):
l.append(row)
self.assert_(len(l) == 3)
- @testing.fails_on('firebird', 'Data type unknown')
+ @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
@testing.requires.subqueries
def test_anonymous_rows(self):
users.insert().execute(
@@ -161,6 +208,7 @@ class QueryTest(TestBase):
assert row['anon_1'] == 8
assert row['anon_2'] == 10
+ @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
@@ -179,6 +227,11 @@ class QueryTest(TestBase):
select([concat]).order_by(concat).execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
+
+ eq_(
+ select([concat]).order_by(concat).execute().fetchall(),
+ [("test: ed",), ("test: fred",), ("test: jack",)]
+ )
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
@@ -195,7 +248,7 @@ class QueryTest(TestBase):
def test_row_comparison(self):
users.insert().execute(user_id = 7, user_name = 'jack')
- rp = users.select().execute().fetchone()
+ rp = users.select().execute().first()
self.assert_(rp == rp)
self.assert_(not(rp != rp))
@@ -207,8 +260,7 @@ class QueryTest(TestBase):
self.assert_(not (rp != equal))
self.assert_(not (equal != equal))
- @testing.fails_on('mssql', 'No support for boolean logic in column select.')
- @testing.fails_on('oracle', 'FIXME: unknown')
+ @testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
@@ -218,11 +270,11 @@ class QueryTest(TestBase):
eq_(testing.db.execute(select([or_(false, false)])).scalar(), False)
eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
- row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone()
+ row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == False
assert row.y == False
- row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone()
+ row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == True
assert row.y == False
@@ -253,6 +305,9 @@ class QueryTest(TestBase):
eq_(expr.execute().fetchall(), result)
+ @testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text")
+ @testing.fails_on("oracle", "neither % nor %% are accepted")
+ @testing.fails_on("+pg8000", "can't interpret result column from '%%'")
@testing.emits_warning('.*now automatically escapes.*')
def test_percents_in_text(self):
for expr, result in (
@@ -277,7 +332,7 @@ class QueryTest(TestBase):
eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )])
- if testing.against('postgres'):
+ if testing.against('postgresql'):
eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )])
eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), [])
@@ -373,7 +428,7 @@ class QueryTest(TestBase):
s = select([datetable.alias('x').c.today]).as_scalar()
s2 = select([datetable.c.id, s.label('somelabel')])
#print s2.c.somelabel.type
- assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
+ assert isinstance(s2.execute().first()['somelabel'], datetime.datetime)
finally:
datetable.drop()
@@ -444,45 +499,58 @@ class QueryTest(TestBase):
users.insert().execute(user_id=2, user_name='jack')
addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
- r = users.select(users.c.user_id==2).execute().fetchone()
+ r = users.select(users.c.user_id==2).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone()
+
+ r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+
# test slices
- r = text("select * from query_addresses", bind=testing.db).execute().fetchone()
+ r = text("select * from query_addresses", bind=testing.db).execute().first()
self.assert_(r[0:1] == (1,))
self.assert_(r[1:] == (2, 'foo@bar.com'))
self.assert_(r[:-1] == (1, 2))
-
+
# test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description
r = text("select query_users.user_id, query_users.user_name from query_users "
- "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone()
+ "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().first()
self.assert_(r['user_id']) == 1
self.assert_(r['user_name']) == "john"
# test using literal tablename.colname
- r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone()
+ r = text('select query_users.user_id AS "query_users.user_id", '
+ 'query_users.user_name AS "query_users.user_name" from query_users',
+ bind=testing.db).execute().first()
self.assert_(r['query_users.user_id']) == 1
self.assert_(r['query_users.user_name']) == "john"
# unary experssions
- r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().fetchone()
+ r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first()
eq_(r[users.c.user_name], 'jack')
eq_(r.user_name, 'jack')
- r.close()
+
+ def test_result_case_sensitivity(self):
+ """test name normalization for result sets."""
+ row = testing.db.execute(
+ select([
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive")
+ ])
+ ).first()
+
+ assert row.keys() == ["case_insensitive", "CaseSensitive"]
+
def test_row_as_args(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.select(users.c.user_id==1).execute().fetchone()
+ r = users.select(users.c.user_id==1).execute().first()
users.delete().execute()
users.insert().execute(r)
- assert users.select().execute().fetchall() == [(1, 'john')]
-
+ eq_(users.select().execute().fetchall(), [(1, 'john')])
+
def test_result_as_args(self):
users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')])
r = users.select().execute()
@@ -496,13 +564,12 @@ class QueryTest(TestBase):
def test_ambiguous_column(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.outerjoin(addresses).select().execute().fetchone()
- try:
- print r['user_id']
- assert False
- except exc.InvalidRequestError, e:
- assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
- str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
+ r = users.outerjoin(addresses).select().execute().first()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r['user_id']
+ )
@testing.requires.subqueries
def test_column_label_targeting(self):
@@ -512,31 +579,29 @@ class QueryTest(TestBase):
users.select().alias('foo'),
users.select().alias(users.name),
):
- row = s.select(use_labels=True).execute().fetchone()
+ row = s.select(use_labels=True).execute().first()
assert row[s.c.user_id] == 7
assert row[s.c.user_name] == 'ed'
def test_keys(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
+ r = users.select().execute().first()
eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_items(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
+ r = users.select().execute().first()
eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
def test_len(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
+ r = users.select().execute().first()
eq_(len(r), 2)
- r.close()
- r = testing.db.execute('select user_name, user_id from query_users').fetchone()
+
+ r = testing.db.execute('select user_name, user_id from query_users').first()
eq_(len(r), 2)
- r.close()
- r = testing.db.execute('select user_name from query_users').fetchone()
+ r = testing.db.execute('select user_name from query_users').first()
eq_(len(r), 1)
- r.close()
def test_cant_execute_join(self):
try:
@@ -549,7 +614,7 @@ class QueryTest(TestBase):
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
- r = users.select(users.c.user_id==1).execute().fetchone()
+ r = users.select(users.c.user_id==1).execute().first()
eq_(r[0], 1)
eq_(r[1], 'foo')
eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
@@ -558,7 +623,7 @@ class QueryTest(TestBase):
def test_column_order_with_text_query(self):
# should return values in query order
users.insert().execute(user_id=1, user_name='foo')
- r = testing.db.execute('select user_name, user_id from query_users').fetchone()
+ r = testing.db.execute('select user_name, user_id from query_users').first()
eq_(r[0], 'foo')
eq_(r[1], 1)
eq_([x.lower() for x in r.keys()], ['user_name', 'user_id'])
@@ -580,7 +645,7 @@ class QueryTest(TestBase):
shadowed.create(checkfirst=True)
try:
shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
- r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
+ r = shadowed.select(shadowed.c.shadow_id==1).execute().first()
self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
@@ -622,13 +687,13 @@ class QueryTest(TestBase):
# Null values are not outside any set
assert len(r) == 0
- u = bindparam('search_key')
+ @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
+ def test_bind_in(self):
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 9, user_name = None)
- s = users.select(u.in_([]))
- r = s.execute(search_key='john').fetchall()
- assert len(r) == 0
- r = s.execute(search_key=None).fetchall()
- assert len(r) == 0
+ u = bindparam('search_key')
s = users.select(not_(u.in_([])))
r = s.execute(search_key='john').fetchall()
@@ -660,14 +725,15 @@ class QueryTest(TestBase):
class PercentSchemaNamesTest(TestBase):
"""tests using percent signs, spaces in table and column names.
- Doesn't pass for mysql, postgres, but this is really a
+ Doesn't pass for mysql, postgresql, but this is really a
SQLAlchemy bug - we should be escaping out %% signs for this
operation the same way we do for text() and column labels.
"""
+
@classmethod
@testing.crashes('mysql', 'mysqldb calls name % (params)')
- @testing.crashes('postgres', 'postgres calls name % (params)')
+ @testing.crashes('postgresql', 'postgresql calls name % (params)')
def setup_class(cls):
global percent_table, metadata
metadata = MetaData(testing.db)
@@ -680,12 +746,12 @@ class PercentSchemaNamesTest(TestBase):
@classmethod
@testing.crashes('mysql', 'mysqldb calls name % (params)')
- @testing.crashes('postgres', 'postgres calls name % (params)')
+ @testing.crashes('postgresql', 'postgresql calls name % (params)')
def teardown_class(cls):
metadata.drop_all()
@testing.crashes('mysql', 'mysqldb calls name % (params)')
- @testing.crashes('postgres', 'postgres calls name % (params)')
+ @testing.crashes('postgresql', 'postgresql calls name % (params)')
def test_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, '%(oneofthese)s':7, 'spaces % more spaces':12},
@@ -731,7 +797,7 @@ class PercentSchemaNamesTest(TestBase):
percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute()
eq_(
- percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(),
+ percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(),
[
(5, 9, 15),
(7, 9, 15),
@@ -852,7 +918,11 @@ class CompoundTest(TestBase):
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
])
-
+
+ @engines.close_first
+ def teardown(self):
+ pass
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -878,6 +948,7 @@ class CompoundTest(TestBase):
found2 = self._fetchall_sorted(u.alias('bar').select().execute())
eq_(found2, wanted)
+ @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs")
def test_union_ordered(self):
(s1, s2) = (
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
@@ -891,6 +962,7 @@ class CompoundTest(TestBase):
('ccc', 'aaa')]
eq_(u.execute().fetchall(), wanted)
+ @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs")
@testing.fails_on('maxdb', 'FIXME: unknown')
@testing.requires.subqueries
def test_union_ordered_alias(self):
@@ -907,6 +979,7 @@ class CompoundTest(TestBase):
eq_(u.alias('bar').select().execute().fetchall(), wanted)
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
+ @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery")
@testing.fails_on('mysql', 'FIXME: unknown')
@testing.fails_on('sqlite', 'FIXME: unknown')
def test_union_all(self):
@@ -925,6 +998,29 @@ class CompoundTest(TestBase):
found2 = self._fetchall_sorted(e.alias('foo').select().execute())
eq_(found2, wanted)
+ def test_union_all_lightweight(self):
+ """like test_union_all, but breaks the sub-union into
+ a subquery with an explicit column reference on the outside,
+ more palatable to a wider variety of engines.
+
+ """
+ u = union(
+ select([t1.c.col3]),
+ select([t1.c.col3]),
+ ).alias()
+
+ e = union_all(
+ select([t1.c.col3]),
+ select([u.c.col3])
+ )
+
+ wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)]
+ found1 = self._fetchall_sorted(e.execute())
+ eq_(found1, wanted)
+
+ found2 = self._fetchall_sorted(e.alias('foo').select().execute())
+ eq_(found2, wanted)
+
@testing.crashes('firebird', 'Does not support intersect')
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
@testing.fails_on('mysql', 'FIXME: unknown')
@@ -1330,3 +1426,6 @@ class OperatorTest(TestBase):
order_by=flds.c.idcol).execute().fetchall(),
[(2,),(1,)]
)
+
+
+