diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
| commit | 8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch) | |
| tree | ae9e27d12c9fbf8297bb90469509e1cb6a206242 /test/sql/test_query.py | |
| parent | 7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff) | |
| download | sqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz | |
merge 0.6 series to trunk.
Diffstat (limited to 'test/sql/test_query.py')
| -rw-r--r-- | test/sql/test_query.py | 331 |
1 files changed, 215 insertions, 116 deletions
diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 51b933e45..0e3b9dff2 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -1,9 +1,11 @@ +from sqlalchemy.test.testing import eq_ import datetime from sqlalchemy import * from sqlalchemy import exc, sql from sqlalchemy.engine import default from sqlalchemy.test import * -from sqlalchemy.test.testing import eq_ +from sqlalchemy.test.testing import eq_, assert_raises_message +from sqlalchemy.test.schema import Table, Column class QueryTest(TestBase): @@ -12,11 +14,11 @@ class QueryTest(TestBase): global users, users2, addresses, metadata metadata = MetaData(testing.db) users = Table('query_users', metadata, - Column('user_id', INT, primary_key = True), + Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) addresses = Table('query_addresses', metadata, - Column('address_id', Integer, primary_key=True), + Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30))) @@ -26,7 +28,8 @@ class QueryTest(TestBase): ) metadata.create_all() - def tearDown(self): + @engines.close_first + def teardown(self): addresses.delete().execute() users.delete().execute() users2.delete().execute() @@ -52,89 +55,133 @@ class QueryTest(TestBase): assert users.count().scalar() == 1 users.update(users.c.user_id == 7).execute(user_name = 'fred') - assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred' + assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred' def test_lastrow_accessor(self): - """Tests the last_inserted_ids() and lastrow_has_id() functions.""" + """Tests the inserted_primary_key and lastrow_has_id() functions.""" - def insert_values(table, values): + def insert_values(engine, table, values): """ Inserts a row into a table, returns the full list of values INSERTed including defaults that fired off on the DB side and detects rows that had defaults and post-fetches. """ - result = table.insert().execute(**values) + result = engine.execute(table.insert(), **values) ret = values.copy() - for col, id in zip(table.primary_key, result.last_inserted_ids()): + for col, id in zip(table.primary_key, result.inserted_primary_key): ret[col.key] = id if result.lastrow_has_defaults(): - criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())]) - row = table.select(criterion).execute().fetchone() + criterion = and_(*[col==id for col, id in zip(table.primary_key, result.inserted_primary_key)]) + row = engine.execute(table.select(criterion)).first() for c in table.c: ret[c.key] = row[c] return ret - for supported, table, values, assertvalues in [ - ( - {'unsupported':['sqlite']}, - Table("t1", metadata, - Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True)), - {'foo':'hi'}, - {'id':1, 'foo':'hi'} - ), - ( - {'unsupported':['sqlite']}, - Table("t2", metadata, - Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') + if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): + test_engines = [ + engines.testing_engine(options={'implicit_returning':False}), + engines.testing_engine(options={'implicit_returning':True}), + ] + else: + test_engines = [testing.db] + + for engine in test_engines: + metadata = MetaData() + for supported, table, values, assertvalues in [ + ( + {'unsupported':['sqlite']}, + Table("t1", metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('foo', String(30), primary_key=True)), + {'foo':'hi'}, + {'id':1, 'foo':'hi'} ), - {'foo':'hi'}, - {'id':1, 'foo':'hi', 'bar':'hi'} - ), - ( - {'unsupported':[]}, - Table("t3", metadata, - Column("id", String(40), primary_key=True), - Column('foo', String(30), primary_key=True), - Column("bar", String(30)) + ( + {'unsupported':['sqlite']}, + Table("t2", metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('foo', String(30), primary_key=True), + Column('bar', String(30), server_default='hi') ), - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} - ), - ( - {'unsupported':[]}, - Table("t4", metadata, - Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') + {'foo':'hi'}, + {'id':1, 'foo':'hi', 'bar':'hi'} ), - {'foo':'hi', 'id':1}, - {'id':1, 'foo':'hi', 'bar':'hi'} - ), - ( - {'unsupported':[]}, - Table("t5", metadata, - Column('id', String(10), primary_key=True), - Column('bar', String(30), server_default='hi') + ( + {'unsupported':[]}, + Table("t3", metadata, + Column("id", String(40), primary_key=True), + Column('foo', String(30), primary_key=True), + Column("bar", String(30)) + ), + {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, + {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} ), - {'id':'id1'}, - {'id':'id1', 'bar':'hi'}, - ), - ]: - if testing.db.name in supported['unsupported']: - continue - try: - table.create() - i = insert_values(table, values) - assert i == assertvalues, repr(i) + " " + repr(assertvalues) - finally: - table.drop() + ( + {'unsupported':[]}, + Table("t4", metadata, + Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), + Column('foo', String(30), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'foo':'hi', 'id':1}, + {'id':1, 'foo':'hi', 'bar':'hi'} + ), + ( + {'unsupported':[]}, + Table("t5", metadata, + Column('id', String(10), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'id':'id1'}, + {'id':'id1', 'bar':'hi'}, + ), + ]: + if testing.db.name in supported['unsupported']: + continue + try: + table.create(bind=engine, checkfirst=True) + i = insert_values(engine, table, values) + assert i == assertvalues, "tablename: %s %r %r" % (table.name, repr(i), repr(assertvalues)) + finally: + table.drop(bind=engine) + + @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks") + def test_misordered_lastrow(self): + related = Table('related', metadata, + Column('id', Integer, primary_key=True) + ) + t6 = Table("t6", metadata, + Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True), + Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True), + ) + metadata.create_all() + r = related.insert().values(id=12).execute() + id = r.inserted_primary_key[0] + assert id==12 + + r = t6.insert().values(manual_id=id).execute() + eq_(r.inserted_primary_key, [12, 1]) + + def test_autoclose_on_insert(self): + if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): + test_engines = [ + engines.testing_engine(options={'implicit_returning':False}), + engines.testing_engine(options={'implicit_returning':True}), + ] + else: + test_engines = [testing.db] + + for engine in test_engines: + + r = engine.execute(users.insert(), + {'user_name':'jack'}, + ) + assert r.closed + def test_row_iteration(self): users.insert().execute( {'user_id':7, 'user_name':'jack'}, @@ -147,7 +194,7 @@ class QueryTest(TestBase): l.append(row) self.assert_(len(l) == 3) - @testing.fails_on('firebird', 'Data type unknown') + @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") @testing.requires.subqueries def test_anonymous_rows(self): users.insert().execute( @@ -161,6 +208,7 @@ class QueryTest(TestBase): assert row['anon_1'] == 8 assert row['anon_2'] == 10 + @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): """test that a label within an ORDER BY works on each backend. @@ -179,6 +227,11 @@ class QueryTest(TestBase): select([concat]).order_by(concat).execute().fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)] ) + + eq_( + select([concat]).order_by(concat).execute().fetchall(), + [("test: ed",), ("test: fred",), ("test: jack",)] + ) concat = ("test: " + users.c.user_name).label('thedata') eq_( @@ -195,7 +248,7 @@ class QueryTest(TestBase): def test_row_comparison(self): users.insert().execute(user_id = 7, user_name = 'jack') - rp = users.select().execute().fetchone() + rp = users.select().execute().first() self.assert_(rp == rp) self.assert_(not(rp != rp)) @@ -207,8 +260,7 @@ class QueryTest(TestBase): self.assert_(not (rp != equal)) self.assert_(not (equal != equal)) - @testing.fails_on('mssql', 'No support for boolean logic in column select.') - @testing.fails_on('oracle', 'FIXME: unknown') + @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -218,11 +270,11 @@ class QueryTest(TestBase): eq_(testing.db.execute(select([or_(false, false)])).scalar(), False) eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True) - row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone() + row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first() assert row.x == False assert row.y == False - row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone() + row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first() assert row.x == True assert row.y == False @@ -253,6 +305,9 @@ class QueryTest(TestBase): eq_(expr.execute().fetchall(), result) + @testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text") + @testing.fails_on("oracle", "neither % nor %% are accepted") + @testing.fails_on("+pg8000", "can't interpret result column from '%%'") @testing.emits_warning('.*now automatically escapes.*') def test_percents_in_text(self): for expr, result in ( @@ -277,7 +332,7 @@ class QueryTest(TestBase): eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )]) - if testing.against('postgres'): + if testing.against('postgresql'): eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )]) eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), []) @@ -373,7 +428,7 @@ class QueryTest(TestBase): s = select([datetable.alias('x').c.today]).as_scalar() s2 = select([datetable.c.id, s.label('somelabel')]) #print s2.c.somelabel.type - assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime) + assert isinstance(s2.execute().first()['somelabel'], datetime.datetime) finally: datetable.drop() @@ -444,45 +499,58 @@ class QueryTest(TestBase): users.insert().execute(user_id=2, user_name='jack') addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') - r = users.select(users.c.user_id==2).execute().fetchone() + r = users.select(users.c.user_id==2).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone() + + r = text("select * from query_users where user_id=2", bind=testing.db).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - + # test slices - r = text("select * from query_addresses", bind=testing.db).execute().fetchone() + r = text("select * from query_addresses", bind=testing.db).execute().first() self.assert_(r[0:1] == (1,)) self.assert_(r[1:] == (2, 'foo@bar.com')) self.assert_(r[:-1] == (1, 2)) - + # test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone() + "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().first() self.assert_(r['user_id']) == 1 self.assert_(r['user_name']) == "john" # test using literal tablename.colname - r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone() + r = text('select query_users.user_id AS "query_users.user_id", ' + 'query_users.user_name AS "query_users.user_name" from query_users', + bind=testing.db).execute().first() self.assert_(r['query_users.user_id']) == 1 self.assert_(r['query_users.user_name']) == "john" # unary experssions - r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().fetchone() + r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first() eq_(r[users.c.user_name], 'jack') eq_(r.user_name, 'jack') - r.close() + + def test_result_case_sensitivity(self): + """test name normalization for result sets.""" + row = testing.db.execute( + select([ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive") + ]) + ).first() + + assert row.keys() == ["case_insensitive", "CaseSensitive"] + def test_row_as_args(self): users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id==1).execute().fetchone() + r = users.select(users.c.user_id==1).execute().first() users.delete().execute() users.insert().execute(r) - assert users.select().execute().fetchall() == [(1, 'john')] - + eq_(users.select().execute().fetchall(), [(1, 'john')]) + def test_result_as_args(self): users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')]) r = users.select().execute() @@ -496,13 +564,12 @@ class QueryTest(TestBase): def test_ambiguous_column(self): users.insert().execute(user_id=1, user_name='john') - r = users.outerjoin(addresses).select().execute().fetchone() - try: - print r['user_id'] - assert False - except exc.InvalidRequestError, e: - assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \ - str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement." + r = users.outerjoin(addresses).select().execute().first() + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) @testing.requires.subqueries def test_column_label_targeting(self): @@ -512,31 +579,29 @@ class QueryTest(TestBase): users.select().alias('foo'), users.select().alias(users.name), ): - row = s.select(use_labels=True).execute().fetchone() + row = s.select(use_labels=True).execute().first() assert row[s.c.user_id] == 7 assert row[s.c.user_name] == 'ed' def test_keys(self): users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() + r = users.select().execute().first() eq_([x.lower() for x in r.keys()], ['user_id', 'user_name']) def test_items(self): users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() + r = users.select().execute().first() eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')]) def test_len(self): users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() + r = users.select().execute().first() eq_(len(r), 2) - r.close() - r = testing.db.execute('select user_name, user_id from query_users').fetchone() + + r = testing.db.execute('select user_name, user_id from query_users').first() eq_(len(r), 2) - r.close() - r = testing.db.execute('select user_name from query_users').fetchone() + r = testing.db.execute('select user_name from query_users').first() eq_(len(r), 1) - r.close() def test_cant_execute_join(self): try: @@ -549,7 +614,7 @@ class QueryTest(TestBase): def test_column_order_with_simple_query(self): # should return values in column definition order users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id==1).execute().fetchone() + r = users.select(users.c.user_id==1).execute().first() eq_(r[0], 1) eq_(r[1], 'foo') eq_([x.lower() for x in r.keys()], ['user_id', 'user_name']) @@ -558,7 +623,7 @@ class QueryTest(TestBase): def test_column_order_with_text_query(self): # should return values in query order users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users').fetchone() + r = testing.db.execute('select user_name, user_id from query_users').first() eq_(r[0], 'foo') eq_(r[1], 1) eq_([x.lower() for x in r.keys()], ['user_name', 'user_id']) @@ -580,7 +645,7 @@ class QueryTest(TestBase): shadowed.create(checkfirst=True) try: shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone() + r = shadowed.select(shadowed.c.shadow_id==1).execute().first() self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow') self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') @@ -622,13 +687,13 @@ class QueryTest(TestBase): # Null values are not outside any set assert len(r) == 0 - u = bindparam('search_key') + @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") + def test_bind_in(self): + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id = 9, user_name = None) - s = users.select(u.in_([])) - r = s.execute(search_key='john').fetchall() - assert len(r) == 0 - r = s.execute(search_key=None).fetchall() - assert len(r) == 0 + u = bindparam('search_key') s = users.select(not_(u.in_([]))) r = s.execute(search_key='john').fetchall() @@ -660,14 +725,15 @@ class QueryTest(TestBase): class PercentSchemaNamesTest(TestBase): """tests using percent signs, spaces in table and column names. - Doesn't pass for mysql, postgres, but this is really a + Doesn't pass for mysql, postgresql, but this is really a SQLAlchemy bug - we should be escaping out %% signs for this operation the same way we do for text() and column labels. """ + @classmethod @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') + @testing.crashes('postgresql', 'postgresql calls name % (params)') def setup_class(cls): global percent_table, metadata metadata = MetaData(testing.db) @@ -680,12 +746,12 @@ class PercentSchemaNamesTest(TestBase): @classmethod @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') + @testing.crashes('postgresql', 'postgresql calls name % (params)') def teardown_class(cls): metadata.drop_all() @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') + @testing.crashes('postgresql', 'postgresql calls name % (params)') def test_roundtrip(self): percent_table.insert().execute( {'percent%':5, '%(oneofthese)s':7, 'spaces % more spaces':12}, @@ -731,7 +797,7 @@ class PercentSchemaNamesTest(TestBase): percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute() eq_( - percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(), + percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(), [ (5, 9, 15), (7, 9, 15), @@ -852,7 +918,11 @@ class CompoundTest(TestBase): dict(col2="t3col2r2", col3="bbb", col4="aaa"), dict(col2="t3col2r3", col3="ccc", col4="bbb"), ]) - + + @engines.close_first + def teardown(self): + pass + @classmethod def teardown_class(cls): metadata.drop_all() @@ -878,6 +948,7 @@ class CompoundTest(TestBase): found2 = self._fetchall_sorted(u.alias('bar').select().execute()) eq_(found2, wanted) + @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs") def test_union_ordered(self): (s1, s2) = ( select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], @@ -891,6 +962,7 @@ class CompoundTest(TestBase): ('ccc', 'aaa')] eq_(u.execute().fetchall(), wanted) + @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs") @testing.fails_on('maxdb', 'FIXME: unknown') @testing.requires.subqueries def test_union_ordered_alias(self): @@ -907,6 +979,7 @@ class CompoundTest(TestBase): eq_(u.alias('bar').select().execute().fetchall(), wanted) @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') + @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery") @testing.fails_on('mysql', 'FIXME: unknown') @testing.fails_on('sqlite', 'FIXME: unknown') def test_union_all(self): @@ -925,6 +998,29 @@ class CompoundTest(TestBase): found2 = self._fetchall_sorted(e.alias('foo').select().execute()) eq_(found2, wanted) + def test_union_all_lightweight(self): + """like test_union_all, but breaks the sub-union into + a subquery with an explicit column reference on the outside, + more palatable to a wider variety of engines. + + """ + u = union( + select([t1.c.col3]), + select([t1.c.col3]), + ).alias() + + e = union_all( + select([t1.c.col3]), + select([u.c.col3]) + ) + + wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] + found1 = self._fetchall_sorted(e.execute()) + eq_(found1, wanted) + + found2 = self._fetchall_sorted(e.alias('foo').select().execute()) + eq_(found2, wanted) + @testing.crashes('firebird', 'Does not support intersect') @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') @testing.fails_on('mysql', 'FIXME: unknown') @@ -1330,3 +1426,6 @@ class OperatorTest(TestBase): order_by=flds.c.idcol).execute().fetchall(), [(2,),(1,)] ) + + + |
