diff options
Diffstat (limited to 'test/sql/test_query.py')
-rw-r--r-- | test/sql/test_query.py | 949 |
1 files changed, 513 insertions, 436 deletions
diff --git a/test/sql/test_query.py b/test/sql/test_query.py index ca2150443..48d9295fb 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -2,9 +2,11 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, is_ from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines from sqlalchemy import util -import datetime -from sqlalchemy import * -from sqlalchemy import exc, sql +from sqlalchemy import ( + exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey, + union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence, + bindparam, literal, not_, type_coerce, literal_column, desc, asc, + TypeDecorator, or_, cast) from sqlalchemy.engine import default, result as _result from sqlalchemy.testing.schema import Table, Column @@ -12,6 +14,9 @@ from sqlalchemy.testing.schema import Table, Column # to test a dialect are being slowly migrated to # sqlalhcemy.testing.suite +users = users2 = addresses = metadata = None + + class QueryTest(fixtures.TestBase): __backend__ = True @@ -19,20 +24,27 @@ class QueryTest(fixtures.TestBase): def setup_class(cls): global users, users2, addresses, metadata metadata = MetaData(testing.db) - users = Table('query_users', metadata, - Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), + users = Table( + 'query_users', metadata, + Column( + 'user_id', INT, primary_key=True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), test_needs_acid=True ) - addresses = Table('query_addresses', metadata, - Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True), + addresses = Table( + 'query_addresses', metadata, + Column( + 'address_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30)), test_needs_acid=True ) - users2 = Table('u2', metadata, - Column('user_id', INT, primary_key = True), + users2 = Table( + 'u2', metadata, + Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), test_needs_acid=True ) @@ -51,8 +63,10 @@ class QueryTest(fixtures.TestBase): @testing.requires.multivalues_inserts def test_multivalues_insert(self): - users.insert(values=[{'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}]).execute() + users.insert( + values=[ + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}]).execute() rows = users.select().order_by(users.c.user_id).execute().fetchall() self.assert_(rows[0] == (7, 'jack')) self.assert_(rows[1] == (8, 'ed')) @@ -65,23 +79,25 @@ class QueryTest(fixtures.TestBase): """test that executemany parameters are asserted to match the parameter set of the first.""" - assert_raises_message(exc.StatementError, + assert_raises_message( + exc.StatementError, r"A value is required for bind parameter 'user_name', in " - "parameter group 2 \(original cause: (sqlalchemy.exc.)?InvalidRequestError: A " + "parameter group 2 " + "\(original cause: (sqlalchemy.exc.)?InvalidRequestError: A " "value is required for bind parameter 'user_name', in " "parameter group 2\) u?'INSERT INTO query_users", users.insert().execute, - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9} + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9} ) # this succeeds however. We aren't yet doing # a length check on all subsequent parameters. users.insert().execute( - {'user_id':7}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9} + {'user_id': 7}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9} ) def test_lastrow_accessor(self): @@ -98,7 +114,8 @@ class QueryTest(fixtures.TestBase): if engine.dialect.implicit_returning: ins = table.insert() comp = ins.compile(engine, column_keys=list(values)) - if not set(values).issuperset(c.key for c in table.primary_key): + if not set(values).issuperset( + c.key for c in table.primary_key): assert comp.returning result = engine.execute(table.insert(), **values) @@ -108,8 +125,10 @@ class QueryTest(fixtures.TestBase): ret[col.key] = id if result.lastrow_has_defaults(): - criterion = and_(*[col==id for col, id in - zip(table.primary_key, result.inserted_primary_key)]) + criterion = and_( + *[ + col == id for col, id in + zip(table.primary_key, result.inserted_primary_key)]) row = engine.execute(table.select(criterion)).first() for c in table.c: ret[c.key] = row[c] @@ -120,8 +139,8 @@ class QueryTest(fixtures.TestBase): if testing.db.dialect.implicit_returning: test_engines = [ - engines.testing_engine(options={'implicit_returning':False}), - engines.testing_engine(options={'implicit_returning':True}), + engines.testing_engine(options={'implicit_returning': False}), + engines.testing_engine(options={'implicit_returning': True}), ] else: test_engines = [testing.db] @@ -130,60 +149,75 @@ class QueryTest(fixtures.TestBase): metadata = MetaData() for supported, table, values, assertvalues in [ ( - {'unsupported':['sqlite']}, - Table("t1", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + {'unsupported': ['sqlite']}, + Table( + "t1", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('foo', String(30), primary_key=True)), - {'foo':'hi'}, - {'id':1, 'foo':'hi'} + {'foo': 'hi'}, + {'id': 1, 'foo': 'hi'} ), ( - {'unsupported':['sqlite']}, - Table("t2", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + {'unsupported': ['sqlite']}, + Table( + "t2", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), server_default='hi') ), - {'foo':'hi'}, - {'id':1, 'foo':'hi', 'bar':'hi'} + {'foo': 'hi'}, + {'id': 1, 'foo': 'hi', 'bar': 'hi'} ), ( - {'unsupported':[]}, - Table("t3", metadata, + {'unsupported': []}, + Table( + "t3", metadata, Column("id", String(40), primary_key=True), Column('foo', String(30), primary_key=True), Column("bar", String(30)) - ), - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} + ), + {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}, + {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"} ), ( - {'unsupported':[]}, - Table("t4", metadata, - Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), + {'unsupported': []}, + Table( + "t4", metadata, + Column( + 'id', Integer, + Sequence('t4_id_seq', optional=True), + primary_key=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), server_default='hi') ), - {'foo':'hi', 'id':1}, - {'id':1, 'foo':'hi', 'bar':'hi'} + {'foo': 'hi', 'id': 1}, + {'id': 1, 'foo': 'hi', 'bar': 'hi'} ), ( - {'unsupported':[]}, - Table("t5", metadata, + {'unsupported': []}, + Table( + "t5", metadata, Column('id', String(10), primary_key=True), Column('bar', String(30), server_default='hi') ), - {'id':'id1'}, - {'id':'id1', 'bar':'hi'}, + {'id': 'id1'}, + {'id': 'id1', 'bar': 'hi'}, ), ( - {'unsupported':['sqlite']}, - Table("t6", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + {'unsupported': ['sqlite']}, + Table( + "t6", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('bar', Integer, primary_key=True) ), - {'bar':0}, - {'id':1, 'bar':0}, + {'bar': 0}, + {'id': 1, 'bar': 0}, ), ]: if testing.db.name in supported['unsupported']: @@ -192,7 +226,7 @@ class QueryTest(fixtures.TestBase): table.create(bind=engine, checkfirst=True) i = insert_values(engine, table, values) assert i == assertvalues, "tablename: %s %r %r" % \ - (table.name, repr(i), repr(assertvalues)) + (table.name, repr(i), repr(assertvalues)) finally: table.drop(bind=engine) @@ -201,44 +235,50 @@ class QueryTest(fixtures.TestBase): def test_lastrowid_zero(self): from sqlalchemy.dialects import sqlite eng = engines.testing_engine() + class ExcCtx(sqlite.base.SQLiteExecutionContext): def get_lastrowid(self): return 0 eng.dialect.execution_ctx_cls = ExcCtx - t = Table('t', MetaData(), Column('x', Integer, primary_key=True), - Column('y', Integer)) + t = Table( + 't', MetaData(), Column('x', Integer, primary_key=True), + Column('y', Integer)) t.create(eng) r = eng.execute(t.insert().values(y=5)) eq_(r.inserted_primary_key, [0]) - - @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks") + @testing.fails_on( + 'sqlite', "sqlite autoincremnt doesn't work with composite pks") def test_misordered_lastrow(self): - related = Table('related', metadata, + related = Table( + 'related', metadata, Column('id', Integer, primary_key=True), mysql_engine='MyISAM' ) - t6 = Table("t6", metadata, - Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True), - Column('auto_id', Integer, primary_key=True, - test_needs_autoincrement=True), + t6 = Table( + "t6", metadata, + Column( + 'manual_id', Integer, ForeignKey('related.id'), + primary_key=True), + Column( + 'auto_id', Integer, primary_key=True, + test_needs_autoincrement=True), mysql_engine='MyISAM' ) metadata.create_all() r = related.insert().values(id=12).execute() id = r.inserted_primary_key[0] - assert id==12 + assert id == 12 r = t6.insert().values(manual_id=id).execute() eq_(r.inserted_primary_key, [12, 1]) - def test_row_iteration(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) r = users.select().execute() l = [] @@ -247,22 +287,24 @@ class QueryTest(fixtures.TestBase): self.assert_(len(l) == 3) @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.requires.subqueries def test_anonymous_rows(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) - sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar() + sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ + as_scalar() for row in select([sel + 1, sel + 3], bind=users.bind).execute(): assert row['anon_1'] == 8 assert row['anon_2'] == 10 - @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") + @testing.fails_on( + 'firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): """test that a label within an ORDER BY works on each backend. @@ -273,9 +315,9 @@ class QueryTest(fixtures.TestBase): """ users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) concat = ("test: " + users.c.user_name).label('thedata') @@ -298,20 +340,20 @@ class QueryTest(fixtures.TestBase): @testing.requires.order_by_label_with_expression def test_order_by_label_compound(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) concat = ("test: " + users.c.user_name).label('thedata') eq_( - select([concat]).order_by(literal_column('thedata') + "x").execute().fetchall(), + select([concat]).order_by(literal_column('thedata') + "x"). + execute().fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)] ) - def test_row_comparison(self): - users.insert().execute(user_id = 7, user_name='jack') + users.insert().execute(user_id=7, user_name='jack') rp = users.select().execute().first() self.assert_(rp == rp) @@ -355,14 +397,14 @@ class QueryTest(fixtures.TestBase): else: eq_(control, op(compare, rp)) - - @testing.provide_metadata def test_column_label_overlap_fallback(self): - content = Table('content', self.metadata, + content = Table( + 'content', self.metadata, Column('type', String(30)), ) - bar = Table('bar', self.metadata, + bar = Table( + 'bar', self.metadata, Column('content_type', String(30)) ) self.metadata.create_all(testing.db) @@ -373,14 +415,16 @@ class QueryTest(fixtures.TestBase): assert bar.c.content_type not in row assert sql.column('content_type') in row - row = testing.db.execute(select([content.c.type.label("content_type")])).first() + row = testing.db.execute( + select([content.c.type.label("content_type")])).first() assert content.c.type in row assert bar.c.content_type not in row assert sql.column('content_type') in row - row = testing.db.execute(select([func.now().label("content_type")])).first() + row = testing.db.execute(select([func.now().label("content_type")])). \ + first() assert content.c.type not in row assert bar.c.content_type not in row @@ -389,14 +433,15 @@ class QueryTest(fixtures.TestBase): def test_pickled_rows(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) for pickle in False, True: for use_labels in False, True: - result = users.select(use_labels=use_labels).order_by(users.c.user_id).execute().fetchall() + result = users.select(use_labels=use_labels).order_by( + users.c.user_id).execute().fetchall() if pickle: result = util.pickle.loads(util.pickle.dumps(result)) @@ -407,7 +452,9 @@ class QueryTest(fixtures.TestBase): ) if use_labels: eq_(result[0]['query_users_user_id'], 7) - eq_(list(result[0].keys()), ["query_users_user_id", "query_users_user_name"]) + eq_( + list(result[0].keys()), + ["query_users_user_id", "query_users_user_name"]) else: eq_(result[0]['user_id'], 7) eq_(list(result[0].keys()), ["user_id", "user_name"]) @@ -417,17 +464,23 @@ class QueryTest(fixtures.TestBase): eq_(result[0][users.c.user_name], 'jack') if not pickle or use_labels: - assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id]) + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id]) else: # test with a different table. name resolution is # causing 'user_id' to match when use_labels wasn't used. eq_(result[0][addresses.c.user_id], 7) - assert_raises(exc.NoSuchColumnError, lambda: result[0]['fake key']) - assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.address_id]) + assert_raises( + exc.NoSuchColumnError, lambda: result[0]['fake key']) + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.address_id]) def test_column_error_printing(self): row = testing.db.execute(select([1])).first() + class unprintable(object): def __str__(self): raise ValueError("nope") @@ -446,10 +499,9 @@ class QueryTest(fixtures.TestBase): lambda: row[accessor] ) - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -458,20 +510,28 @@ class QueryTest(fixtures.TestBase): eq_(testing.db.execute(select([and_(true, true)])).scalar(), True) eq_(testing.db.execute(select([or_(true, false)])).scalar(), True) eq_(testing.db.execute(select([or_(false, false)])).scalar(), False) - eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True) + eq_( + testing.db.execute(select([not_(or_(false, false))])).scalar(), + True) - row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first() - assert row.x == False - assert row.y == False + row = testing.db.execute( + select( + [or_(false, false).label("x"), + and_(true, false).label("y")])).first() + assert row.x == False # noqa + assert row.y == False # noqa - row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first() - assert row.x == True - assert row.y == False + row = testing.db.execute( + select( + [or_(true, false).label("x"), + and_(true, false).label("y")])).first() + assert row.x == True # noqa + assert row.y == False # noqa def test_fetchmany(self): - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'ed') - users.insert().execute(user_id = 9, user_name = 'fred') + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='ed') + users.insert().execute(user_id=9, user_name='fred') r = users.select().execute() l = [] for row in r.fetchmany(size=2): @@ -480,32 +540,29 @@ class QueryTest(fixtures.TestBase): def test_like_ops(self): users.insert().execute( - {'user_id':1, 'user_name':'apples'}, - {'user_id':2, 'user_name':'oranges'}, - {'user_id':3, 'user_name':'bananas'}, - {'user_id':4, 'user_name':'legumes'}, - {'user_id':5, 'user_name':'hi % there'}, + {'user_id': 1, 'user_name': 'apples'}, + {'user_id': 2, 'user_name': 'oranges'}, + {'user_id': 3, 'user_name': 'bananas'}, + {'user_id': 4, 'user_name': 'legumes'}, + {'user_id': 5, 'user_name': 'hi % there'}, ) for expr, result in ( - (select([users.c.user_id]).\ - where(users.c.user_name.startswith('apple')), [(1,)]), - (select([users.c.user_id]).\ - where(users.c.user_name.contains('i % t')), [(5,)]), - (select([users.c.user_id]).\ - where( - users.c.user_name.endswith('anas') - ), [(3,)]), - (select([users.c.user_id]).\ - where( - users.c.user_name.contains('i % t', escape='&') - ), [(5,)]), + (select([users.c.user_id]). + where(users.c.user_name.startswith('apple')), [(1,)]), + (select([users.c.user_id]). + where(users.c.user_name.contains('i % t')), [(5,)]), + (select([users.c.user_id]). + where(users.c.user_name.endswith('anas')), [(3,)]), + (select([users.c.user_id]). + where(users.c.user_name.contains('i % t', escape='&')), + [(5,)]), ): eq_(expr.execute().fetchall(), result) @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.requires.mod_operator_as_percent_sign @testing.emits_warning('.*now automatically escapes.*') def test_percents_in_text(self): @@ -521,36 +578,44 @@ class QueryTest(fixtures.TestBase): def test_ilike(self): users.insert().execute( - {'user_id':1, 'user_name':'one'}, - {'user_id':2, 'user_name':'TwO'}, - {'user_id':3, 'user_name':'ONE'}, - {'user_id':4, 'user_name':'OnE'}, + {'user_id': 1, 'user_name': 'one'}, + {'user_id': 2, 'user_name': 'TwO'}, + {'user_id': 3, 'user_name': 'ONE'}, + {'user_id': 4, 'user_name': 'OnE'}, ) - eq_(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )]) + eq_( + select([users.c.user_id]).where(users.c.user_name.ilike('one')). + execute().fetchall(), [(1, ), (3, ), (4, )]) - eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )]) + eq_( + select([users.c.user_id]).where(users.c.user_name.ilike('TWO')). + execute().fetchall(), [(2, )]) if testing.against('postgresql'): - eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )]) - eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), []) - + eq_( + select([users.c.user_id]). + where(users.c.user_name.like('one')).execute().fetchall(), + [(1, )]) + eq_( + select([users.c.user_id]). + where(users.c.user_name.like('TWO')).execute().fetchall(), []) def test_compiled_execute(self): - users.insert().execute(user_id = 7, user_name = 'jack') - s = select([users], users.c.user_id==bindparam('id')).compile() + users.insert().execute(user_id=7, user_name='jack') + s = select([users], users.c.user_id == bindparam('id')).compile() c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 def test_compiled_insert_execute(self): - users.insert().compile().execute(user_id = 7, user_name = 'jack') - s = select([users], users.c.user_id==bindparam('id')).compile() + users.insert().compile().execute(user_id=7, user_name='jack') + s = select([users], users.c.user_id == bindparam('id')).compile() c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") def test_repeated_bindparams(self): """Tests that a BindParam can be used more than once. @@ -558,11 +623,11 @@ class QueryTest(fixtures.TestBase): paramstyles. """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') u = bindparam('userid') - s = users.select(and_(users.c.user_name==u, users.c.user_name==u)) + s = users.select(and_(users.c.user_name == u, users.c.user_name == u)) r = s.execute(userid='fred').fetchall() assert len(r) == 1 @@ -599,10 +664,12 @@ class QueryTest(fixtures.TestBase): @testing.requires.standalone_binds def test_select_from_bindparam(self): - """Test result row processing when selecting from a plain bind param.""" + """Test result row processing when selecting from a plain bind + param.""" class MyInteger(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): return int(value[4:]) @@ -614,13 +681,11 @@ class QueryTest(fixtures.TestBase): "INT_5" ) eq_( - testing.db.scalar(select([cast("INT_5", type_=MyInteger).label('foo')])), + testing.db.scalar( + select([cast("INT_5", type_=MyInteger).label('foo')])), "INT_5" ) - - - def test_order_by(self): """Exercises ORDER BY clause generation. @@ -719,32 +784,42 @@ class QueryTest(fixtures.TestBase): use_labels=labels), [(1, None), (2, 'b'), (3, 'a')]) - a_eq(users.select(order_by=[users.c.user_name.desc().nullslast()], - use_labels=labels), - [(2, 'b'), (3, 'a'), (1, None)]) + a_eq( + users.select( + order_by=[users.c.user_name.desc().nullslast()], + use_labels=labels), + [(2, 'b'), (3, 'a'), (1, None)]) - a_eq(users.select(order_by=[desc(users.c.user_name).nullsfirst()], - use_labels=labels), - [(1, None), (2, 'b'), (3, 'a')]) + a_eq( + users.select( + order_by=[desc(users.c.user_name).nullsfirst()], + use_labels=labels), + [(1, None), (2, 'b'), (3, 'a')]) a_eq(users.select(order_by=[desc(users.c.user_name).nullslast()], use_labels=labels), [(2, 'b'), (3, 'a'), (1, None)]) - a_eq(users.select(order_by=[users.c.user_name.nullsfirst(), users.c.user_id], - use_labels=labels), - [(1, None), (3, 'a'), (2, 'b')]) + a_eq( + users.select( + order_by=[users.c.user_name.nullsfirst(), users.c.user_id], + use_labels=labels), + [(1, None), (3, 'a'), (2, 'b')]) - a_eq(users.select(order_by=[users.c.user_name.nullslast(), users.c.user_id], - use_labels=labels), - [(3, 'a'), (2, 'b'), (1, None)]) + a_eq( + users.select( + order_by=[users.c.user_name.nullslast(), users.c.user_id], + use_labels=labels), + [(3, 'a'), (2, 'b'), (1, None)]) def test_column_slices(self): users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') - addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') + addresses.insert().execute( + address_id=1, user_id=2, address='foo@bar.com') - r = text("select * from query_addresses", bind=testing.db).execute().first() + r = text( + "select * from query_addresses", bind=testing.db).execute().first() self.assert_(r[0:1] == (1,)) self.assert_(r[1:] == (2, 'foo@bar.com')) self.assert_(r[:-1] == (1, 2)) @@ -755,9 +830,10 @@ class QueryTest(fixtures.TestBase): dict(user_id=2, user_name='jack') ) - r = users.select(users.c.user_id==2).execute().first() + r = users.select(users.c.user_id == 2).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + self.assert_( + r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') def test_column_accessor_basic_text(self): users.insert().execute( @@ -765,10 +841,10 @@ class QueryTest(fixtures.TestBase): dict(user_id=2, user_name='jack') ) r = testing.db.execute( - text("select * from query_users where user_id=2") - ).first() + text("select * from query_users where user_id=2")).first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + self.assert_( + r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') def test_column_accessor_textual_select(self): users.insert().execute( @@ -779,10 +855,11 @@ class QueryTest(fixtures.TestBase): # the select(), these need to match on name anyway r = testing.db.execute( select(['user_id', 'user_name']).select_from('query_users'). - where('user_id=2') + where('user_id=2') ).first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + self.assert_( + r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') def test_column_accessor_dotted_union(self): users.insert().execute( @@ -792,10 +869,13 @@ class QueryTest(fixtures.TestBase): # test a little sqlite weirdness - with the UNION, # cols come back as "query_users.user_id" in cursor.description r = testing.db.execute( - text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users" - ) - ).first() + text( + "select query_users.user_id, query_users.user_name " + "from query_users " + "UNION select query_users.user_id, " + "query_users.user_name from query_users" + ) + ).first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") eq_(list(r.keys()), ["user_id", "user_name"]) @@ -806,9 +886,13 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), ) - r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", - bind=testing.db).execution_options(sqlite_raw_colnames=True).execute().first() + r = text( + "select query_users.user_id, query_users.user_name " + "from query_users " + "UNION select query_users.user_id, " + "query_users.user_name from query_users", + bind=testing.db).execution_options(sqlite_raw_colnames=True). \ + execute().first() assert 'user_id' not in r assert 'user_name' not in r eq_(r['query_users.user_id'], 1) @@ -821,8 +905,11 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), ) - r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", + r = text( + "select query_users.user_id, query_users.user_name " + "from query_users " + "UNION select query_users.user_id, " + "query_users.user_name from query_users", bind=testing.db).execute().first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") @@ -835,9 +922,11 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), ) # test using literal tablename.colname - r = text('select query_users.user_id AS "query_users.user_id", ' - 'query_users.user_name AS "query_users.user_name" from query_users', - bind=testing.db).execution_options(sqlite_raw_colnames=True).execute().first() + r = text( + 'select query_users.user_id AS "query_users.user_id", ' + 'query_users.user_name AS "query_users.user_name" ' + 'from query_users', bind=testing.db).\ + execution_options(sqlite_raw_colnames=True).execute().first() eq_(r['query_users.user_id'], 1) eq_(r['query_users.user_name'], "john") assert "user_name" not in r @@ -849,7 +938,8 @@ class QueryTest(fixtures.TestBase): ) # unary experssions - r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first() + r = select([users.c.user_name.distinct()]).order_by( + users.c.user_name).execute().first() eq_(r[users.c.user_name], 'john') eq_(r.user_name, 'john') @@ -866,8 +956,6 @@ class QueryTest(fixtures.TestBase): lambda: r['foo'] ) - - def test_graceful_fetch_on_non_rows(self): """test that calling fetchone() etc. on a result that doesn't return rows fails gracefully. @@ -895,7 +983,8 @@ class QueryTest(fixtures.TestBase): @testing.requires.empty_inserts @testing.requires.returning def test_no_inserted_pk_on_returning(self): - result = testing.db.execute(users.insert().returning(users.c.user_id, users.c.user_name)) + result = testing.db.execute(users.insert().returning( + users.c.user_id, users.c.user_name)) assert_raises_message( exc.InvalidRequestError, r"Can't call inserted_primary_key when returning\(\) is used.", @@ -933,7 +1022,7 @@ class QueryTest(fixtures.TestBase): ) def test_row_case_insensitive(self): - ins_db = engines.testing_engine(options={"case_sensitive":False}) + ins_db = engines.testing_engine(options={"case_sensitive": False}) row = ins_db.execute( select([ literal_column("1").label("case_insensitive"), @@ -944,21 +1033,20 @@ class QueryTest(fixtures.TestBase): eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) - eq_(row["Case_insensitive"],1) - eq_(row["casesensitive"],2) - + eq_(row["Case_insensitive"], 1) + eq_(row["casesensitive"], 2) def test_row_as_args(self): users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id==1).execute().first() + r = users.select(users.c.user_id == 1).execute().first() users.delete().execute() users.insert().execute(r) eq_(users.select().execute().fetchall(), [(1, 'john')]) def test_result_as_args(self): users.insert().execute([ - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='ed')]) + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='ed')]) r = users.select().execute() users2.insert().execute(list(r)) eq_( @@ -1066,10 +1154,8 @@ class QueryTest(fixtures.TestBase): # in 0.8, both columns are present so it's True; # but when they're fetched you'll get the ambiguous error. users.insert().execute(user_id=1, user_name='john') - result = select([ - users.c.user_id, - addresses.c.user_id]).\ - select_from(users.outerjoin(addresses)).execute() + result = select([users.c.user_id, addresses.c.user_id]).\ + select_from(users.outerjoin(addresses)).execute() row = result.first() eq_( @@ -1079,9 +1165,9 @@ class QueryTest(fixtures.TestBase): def test_ambiguous_column_by_col_plus_label(self): users.insert().execute(user_id=1, user_name='john') - result = select([users.c.user_id, - type_coerce(users.c.user_id, Integer).label('foo')] - ).execute() + result = select( + [users.c.user_id, + type_coerce(users.c.user_id, Integer).label('foo')]).execute() row = result.first() eq_( row[users.c.user_id], 1 @@ -1112,25 +1198,27 @@ class QueryTest(fixtures.TestBase): def test_items(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() - eq_([(x[0].lower(), x[1]) for x in list(r.items())], [('user_id', 1), ('user_name', 'foo')]) + eq_( + [(x[0].lower(), x[1]) for x in list(r.items())], + [('user_id', 1), ('user_name', 'foo')]) def test_len(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() eq_(len(r), 2) - r = testing.db.execute('select user_name, user_id from query_users').first() + r = testing.db.execute('select user_name, user_id from query_users'). \ + first() eq_(len(r), 2) r = testing.db.execute('select user_name from query_users').first() eq_(len(r), 1) - def test_sorting_in_python(self): users.insert().execute( - dict(user_id=1, user_name='foo'), - dict(user_id=2, user_name='bar'), - dict(user_id=3, user_name='def'), - ) + dict(user_id=1, user_name='foo'), + dict(user_id=2, user_name='bar'), + dict(user_id=3, user_name='def'), + ) rows = users.select().order_by(users.c.user_name).execute().fetchall() @@ -1141,7 +1229,7 @@ class QueryTest(fixtures.TestBase): def test_column_order_with_simple_query(self): # should return values in column definition order users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id==1).execute().first() + r = users.select(users.c.user_id == 1).execute().first() eq_(r[0], 1) eq_(r[1], 'foo') eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) @@ -1150,7 +1238,8 @@ class QueryTest(fixtures.TestBase): def test_column_order_with_text_query(self): # should return values in query order users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users').first() + r = testing.db.execute('select user_name, user_id from query_users'). \ + first() eq_(r[0], 'foo') eq_(r[1], 1) eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) @@ -1160,25 +1249,32 @@ class QueryTest(fixtures.TestBase): @testing.crashes('firebird', 'An identifier must begin with a letter') def test_column_accessor_shadow(self): meta = MetaData(testing.db) - shadowed = Table('test_shadowed', meta, - Column('shadow_id', INT, primary_key = True), - Column('shadow_name', VARCHAR(20)), - Column('parent', VARCHAR(20)), - Column('row', VARCHAR(40)), - Column('_parent', VARCHAR(20)), - Column('_row', VARCHAR(20)), + shadowed = Table( + 'test_shadowed', meta, + Column('shadow_id', INT, primary_key=True), + Column('shadow_name', VARCHAR(20)), + Column('parent', VARCHAR(20)), + Column('row', VARCHAR(40)), + Column('_parent', VARCHAR(20)), + Column('_row', VARCHAR(20)), ) shadowed.create(checkfirst=True) try: - shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', - row='Without light there is no shadow', - _parent='Hidden parent', - _row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id==1).execute().first() - self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) - self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow') - self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') - self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow') + shadowed.insert().execute( + shadow_id=1, shadow_name='The Shadow', parent='The Light', + row='Without light there is no shadow', + _parent='Hidden parent', _row='Hidden row') + r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() + self.assert_( + r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) + self.assert_( + r.shadow_name == r['shadow_name'] == + r[shadowed.c.shadow_name] == 'The Shadow') + self.assert_( + r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') + self.assert_( + r.row == r['row'] == r[shadowed.c.row] == + 'Without light there is no shadow') self.assert_(r['_parent'] == 'Hidden parent') self.assert_(r['_row'] == 'Hidden row') finally: @@ -1188,9 +1284,9 @@ class QueryTest(fixtures.TestBase): def test_in_filtering(self): """test the behavior of the in_() function.""" - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) s = users.select(users.c.user_name.in_([])) r = s.execute().fetchall() @@ -1202,24 +1298,24 @@ class QueryTest(fixtures.TestBase): # All usernames with a value are outside an empty set assert len(r) == 2 - s = users.select(users.c.user_name.in_(['jack','fred'])) + s = users.select(users.c.user_name.in_(['jack', 'fred'])) r = s.execute().fetchall() assert len(r) == 2 - s = users.select(not_(users.c.user_name.in_(['jack','fred']))) + s = users.select(not_(users.c.user_name.in_(['jack', 'fred']))) r = s.execute().fetchall() # Null values are not outside any set assert len(r) == 0 @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.emits_warning('.*empty sequence.*') @testing.fails_on('firebird', "uses sql-92 rules") @testing.fails_on('sybase', "uses sql-92 rules") - @testing.fails_if(lambda: - testing.against('mssql+pyodbc') and not testing.db.dialect.freetds, - "uses sql-92 rules") + @testing.fails_if( + lambda: testing.against('mssql+pyodbc') and not + testing.db.dialect.freetds, "uses sql-92 rules") def test_bind_in(self): """test calling IN against a bind parameter. @@ -1228,9 +1324,9 @@ class QueryTest(fixtures.TestBase): """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) u = bindparam('search_key') @@ -1241,21 +1337,20 @@ class QueryTest(fixtures.TestBase): assert len(r) == 0 @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.emits_warning('.*empty sequence.*') def test_literal_in(self): """similar to test_bind_in but use a bind with a value.""" - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) s = users.select(not_(literal("john").in_([]))) r = s.execute().fetchall() assert len(r) == 3 - @testing.emits_warning('.*empty sequence.*') @testing.requires.boolean_col_expressions def test_in_filtering_advanced(self): @@ -1265,31 +1360,33 @@ class QueryTest(fixtures.TestBase): """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) - s = users.select(users.c.user_name.in_([]) == True) + s = users.select(users.c.user_name.in_([]) == True) # noqa r = s.execute().fetchall() assert len(r) == 0 - s = users.select(users.c.user_name.in_([]) == False) + s = users.select(users.c.user_name.in_([]) == False) # noqa r = s.execute().fetchall() assert len(r) == 2 - s = users.select(users.c.user_name.in_([]) == None) + s = users.select(users.c.user_name.in_([]) == None) # noqa r = s.execute().fetchall() assert len(r) == 1 + class RequiredBindTest(fixtures.TablesTest): run_create_tables = None run_deletes = None @classmethod def define_tables(cls, metadata): - Table('foo', metadata, - Column('id', Integer, primary_key=True), - Column('data', String(50)), - Column('x', Integer) - ) + Table( + 'foo', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)), + Column('x', Integer) + ) def _assert_raises(self, stmt, params): assert_raises_message( @@ -1303,19 +1400,15 @@ class RequiredBindTest(fixtures.TablesTest): testing.db.execute, stmt, params) def test_insert(self): - stmt = self.tables.foo.insert().values(x=bindparam('x'), - data=bindparam('data')) - self._assert_raises( - stmt, {'data': 'data'} - ) + stmt = self.tables.foo.insert().values( + x=bindparam('x'), data=bindparam('data')) + self._assert_raises(stmt, {'data': 'data'}) def test_select_where(self): - stmt = select([self.tables.foo]).\ - where(self.tables.foo.c.data == bindparam('data')).\ - where(self.tables.foo.c.x == bindparam('x')) - self._assert_raises( - stmt, {'data': 'data'} - ) + stmt = select([self.tables.foo]). \ + where(self.tables.foo.c.data == bindparam('data')). \ + where(self.tables.foo.c.x == bindparam('x')) + self._assert_raises(stmt, {'data': 'data'}) @testing.requires.standalone_binds def test_select_columns(self): @@ -1341,6 +1434,7 @@ class RequiredBindTest(fixtures.TablesTest): is_(bindparam('foo', callable_=c).required, False) is_(bindparam('foo', callable_=c, required=False).required, False) + class TableInsertTest(fixtures.TablesTest): """test for consistent insert behavior across dialects regarding the inline=True flag, lower-case 't' tables. @@ -1351,21 +1445,22 @@ class TableInsertTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): - Table('foo', metadata, - Column('id', Integer, Sequence('t_id_seq'), primary_key=True), - Column('data', String(50)), - Column('x', Integer) - ) + Table( + 'foo', metadata, + Column('id', Integer, Sequence('t_id_seq'), primary_key=True), + Column('data', String(50)), + Column('x', Integer) + ) def _fixture(self, types=True): if types: - t = sql.table('foo', sql.column('id', Integer), - sql.column('data', String), - sql.column('x', Integer)) + t = sql.table( + 'foo', sql.column('id', Integer), + sql.column('data', String), + sql.column('x', Integer)) else: - t = sql.table('foo', sql.column('id'), - sql.column('data'), - sql.column('x')) + t = sql.table( + 'foo', sql.column('id'), sql.column('data'), sql.column('x')) return t def _test(self, stmt, row, returning=None, inserted_primary_key=False): @@ -1382,8 +1477,9 @@ class TableInsertTest(fixtures.TablesTest): def _test_multi(self, stmt, rows, data): testing.db.execute(stmt, rows) eq_( - testing.db.execute(self.tables.foo.select(). - order_by(self.tables.foo.c.id)).fetchall(), + testing.db.execute( + self.tables.foo.select(). + order_by(self.tables.foo.c.id)).fetchall(), data) @testing.requires.sequences @@ -1391,19 +1487,14 @@ class TableInsertTest(fixtures.TablesTest): t = self._fixture() self._test( t.insert().values( - id=func.next_value(Sequence('t_id_seq')), - data='data', x=5 - ), + id=func.next_value(Sequence('t_id_seq')), data='data', x=5), (1, 'data', 5) ) def test_uppercase(self): t = self.tables.foo self._test( - t.insert().values( - id=1, - data='data', x=5 - ), + t.insert().values(id=1, data='data', x=5), (1, 'data', 5), inserted_primary_key=[1] ) @@ -1411,22 +1502,18 @@ class TableInsertTest(fixtures.TablesTest): def test_uppercase_inline(self): t = self.tables.foo self._test( - t.insert(inline=True).values( - id=1, - data='data', x=5 - ), + t.insert(inline=True).values(id=1, data='data', x=5), (1, 'data', 5), inserted_primary_key=[1] ) - @testing.crashes("mssql+pyodbc", - "Pyodbc + SQL Server + Py3K, some decimal handling issue") + @testing.crashes( + "mssql+pyodbc", + "Pyodbc + SQL Server + Py3K, some decimal handling issue") def test_uppercase_inline_implicit(self): t = self.tables.foo self._test( - t.insert(inline=True).values( - data='data', x=5 - ), + t.insert(inline=True).values(data='data', x=5), (1, 'data', 5), inserted_primary_key=[None] ) @@ -1451,14 +1538,13 @@ class TableInsertTest(fixtures.TablesTest): def test_uppercase_direct_params_returning(self): t = self.tables.foo self._test( - t.insert().values( - id=1, data='data', x=5).returning(t.c.id, t.c.x), + t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), (1, 'data', 5), returning=(1, 5) ) - @testing.fails_on('mssql', - "lowercase table doesn't support identity insert disable") + @testing.fails_on( + 'mssql', "lowercase table doesn't support identity insert disable") def test_direct_params(self): t = self._fixture() self._test( @@ -1467,14 +1553,13 @@ class TableInsertTest(fixtures.TablesTest): inserted_primary_key=[] ) - @testing.fails_on('mssql', - "lowercase table doesn't support identity insert disable") + @testing.fails_on( + 'mssql', "lowercase table doesn't support identity insert disable") @testing.requires.returning def test_direct_params_returning(self): t = self._fixture() self._test( - t.insert().values( - id=1, data='data', x=5).returning(t.c.id, t.c.x), + t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), (1, 'data', 5), returning=(1, 5) ) @@ -1483,8 +1568,7 @@ class TableInsertTest(fixtures.TablesTest): def test_implicit_pk(self): t = self._fixture() self._test( - t.insert().values( - data='data', x=5), + t.insert().values(data='data', x=5), (1, 'data', 5), inserted_primary_key=[] ) @@ -1495,9 +1579,9 @@ class TableInsertTest(fixtures.TablesTest): self._test_multi( t.insert(), [ - {'data':'d1', 'x':5}, - {'data':'d2', 'x':6}, - {'data':'d3', 'x':7}, + {'data': 'd1', 'x': 5}, + {'data': 'd2', 'x': 6}, + {'data': 'd3', 'x': 7}, ], [ (1, 'd1', 5), @@ -1523,32 +1607,19 @@ class KeyTargetingTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): - keyed1 = Table('keyed1', metadata, - Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q") - ) - keyed2 = Table('keyed2', metadata, - Column("a", CHAR(2)), - Column("b", CHAR(2)), - ) - keyed3 = Table('keyed3', metadata, - Column("a", CHAR(2)), - Column("d", CHAR(2)), - ) - keyed4 = Table('keyed4', metadata, - Column("b", CHAR(2)), - Column("q", CHAR(2)), - ) - - content = Table('content', metadata, - Column('t', String(30), key="type"), - ) - bar = Table('bar', metadata, - Column('ctype', String(30), key="content_type") + Table( + 'keyed1', metadata, Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q") ) + Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table('content', metadata, Column('t', String(30), key="type")) + Table('bar', metadata, Column('ctype', String(30), key="content_type")) if testing.requires.schemas.enabled: - wschema = Table('wschema', metadata, + Table( + 'wschema', metadata, Column("a", CHAR(2), key="b"), Column("c", CHAR(2), key="q"), schema="test_schema" @@ -1563,7 +1634,8 @@ class KeyTargetingTest(fixtures.TablesTest): cls.tables.content.insert().execute(type="t1") if testing.requires.schemas.enabled: - cls.tables['test_schema.wschema'].insert().execute(dict(b="a1", q="c1")) + cls.tables['test_schema.wschema'].insert().execute( + dict(b="a1", q="c1")) @testing.requires.schemas def test_keyed_accessor_wschema(self): @@ -1575,7 +1647,6 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.a, "a1") eq_(row.c, "c1") - def test_keyed_accessor_single(self): keyed1 = self.tables.keyed1 row = testing.db.execute(keyed1.select()).first() @@ -1642,7 +1713,8 @@ class KeyTargetingTest(fixtures.TablesTest): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 - row = testing.db.execute(select([keyed1, keyed2]).apply_labels()).first() + row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ + first() eq_(row.keyed1_b, "a1") eq_(row.keyed1_a, "a1") eq_(row.keyed1_q, "c1") @@ -1654,12 +1726,14 @@ class KeyTargetingTest(fixtures.TablesTest): def test_column_label_overlap_fallback(self): content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(select([content.c.type.label("content_type")])).first() + row = testing.db.execute( + select([content.c.type.label("content_type")])).first() assert content.c.type not in row assert bar.c.content_type not in row assert sql.column('content_type') in row - row = testing.db.execute(select([func.now().label("content_type")])).first() + row = testing.db.execute(select([func.now().label("content_type")])). \ + first() assert content.c.type not in row assert bar.c.content_type not in row assert sql.column('content_type') in row @@ -1721,7 +1795,8 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] a, b = sql.column('keyed2_a'), sql.column('keyed2_b') - stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(a, b) + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + a, b) row = testing.db.execute(stmt).first() assert keyed2.c.a in row @@ -1737,7 +1812,7 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - keyed2_a=CHAR, keyed2_b=CHAR) + keyed2_a=CHAR, keyed2_b=CHAR) row = testing.db.execute(stmt).first() assert keyed2.c.a in row @@ -1746,7 +1821,6 @@ class KeyTargetingTest(fixtures.TablesTest): assert stmt.c.keyed2_b in row - class LimitTest(fixtures.TestBase): __backend__ = True @@ -1754,11 +1828,13 @@ class LimitTest(fixtures.TestBase): def setup_class(cls): global users, addresses, metadata metadata = MetaData(testing.db) - users = Table('query_users', metadata, - Column('user_id', INT, primary_key = True), + users = Table( + 'query_users', metadata, + Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), ) - addresses = Table('query_addresses', metadata, + addresses = Table( + 'query_addresses', metadata, Column('address_id', Integer, primary_key=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30))) @@ -1784,22 +1860,27 @@ class LimitTest(fixtures.TestBase): metadata.drop_all() def test_select_limit(self): - r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall() + r = users.select(limit=3, order_by=[users.c.user_id]).execute(). \ + fetchall() self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r)) @testing.requires.offset def test_select_limit_offset(self): """Test the interaction between limit and offset""" - r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')]) - r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r==[(6, 'ralph'), (7, 'fido')]) + r = users.select(limit=3, offset=2, order_by=[users.c.user_id]). \ + execute().fetchall() + self.assert_(r == [(3, 'ed'), (4, 'wendy'), (5, 'laura')]) + r = users.select(offset=5, order_by=[users.c.user_id]).execute(). \ + fetchall() + self.assert_(r == [(6, 'ralph'), (7, 'fido')]) def test_select_distinct_limit(self): """Test the interaction between limit and distinct""" - r = sorted([x[0] for x in select([addresses.c.address]).distinct().limit(3).order_by(addresses.c.address).execute().fetchall()]) + r = sorted( + [x[0] for x in select([addresses.c.address]).distinct(). + limit(3).order_by(addresses.c.address).execute().fetchall()]) self.assert_(len(r) == 3, repr(r)) self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) @@ -1808,10 +1889,10 @@ class LimitTest(fixtures.TestBase): def test_select_distinct_offset(self): """Test the interaction between distinct and offset""" - r = sorted([x[0] for x in - select([addresses.c.address]).distinct(). - offset(1).order_by(addresses.c.address). - execute().fetchall()]) + r = sorted( + [x[0] for x in select([addresses.c.address]).distinct(). + offset(1).order_by(addresses.c.address). + execute().fetchall()]) eq_(len(r), 4) self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r)) @@ -1819,13 +1900,15 @@ class LimitTest(fixtures.TestBase): def test_select_distinct_limit_offset(self): """Test the interaction between limit and limit/offset""" - r = select([addresses.c.address]).order_by(addresses.c.address).distinct().offset(2).limit(3).execute().fetchall() + r = select([addresses.c.address]).order_by(addresses.c.address). \ + distinct().offset(2).limit(3).execute().fetchall() self.assert_(len(r) == 3, repr(r)) self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) + class CompoundTest(fixtures.TestBase): - """test compound statements like UNION, INTERSECT, particularly their ability to nest on - different databases.""" + """test compound statements like UNION, INTERSECT, particularly their + ability to nest on different databases.""" __backend__ = True @@ -1833,19 +1916,27 @@ class CompoundTest(fixtures.TestBase): def setup_class(cls): global metadata, t1, t2, t3 metadata = MetaData(testing.db) - t1 = Table('t1', metadata, - Column('col1', Integer, test_needs_autoincrement=True, primary_key=True), + t1 = Table( + 't1', metadata, + Column( + 'col1', Integer, test_needs_autoincrement=True, + primary_key=True), Column('col2', String(30)), Column('col3', String(40)), - Column('col4', String(30)) - ) - t2 = Table('t2', metadata, - Column('col1', Integer, test_needs_autoincrement=True, primary_key=True), + Column('col4', String(30))) + t2 = Table( + 't2', metadata, + Column( + 'col1', Integer, test_needs_autoincrement=True, + primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30))) - t3 = Table('t3', metadata, - Column('col1', Integer, test_needs_autoincrement=True, primary_key=True), + t3 = Table( + 't3', metadata, + Column( + 'col1', Integer, test_needs_autoincrement=True, + primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30))) @@ -1926,7 +2017,9 @@ class CompoundTest(fixtures.TestBase): eq_(u.alias('bar').select().execute().fetchall(), wanted) @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery") + @testing.fails_on( + 'firebird', + "has trouble extracting anonymous column from union subquery") @testing.fails_on('mysql', 'FIXME: unknown') @testing.fails_on('sqlite', 'FIXME: unknown') def test_union_all(self): @@ -1938,7 +2031,7 @@ class CompoundTest(fixtures.TestBase): ) ) - wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] + wanted = [('aaa',), ('aaa',), ('bbb',), ('bbb',), ('ccc',), ('ccc',)] found1 = self._fetchall_sorted(e.execute()) eq_(found1, wanted) @@ -1962,7 +2055,7 @@ class CompoundTest(fixtures.TestBase): select([u.c.col3]) ) - wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] + wanted = [('aaa',), ('aaa',), ('bbb',), ('bbb',), ('ccc',), ('ccc',)] found1 = self._fetchall_sorted(e.execute()) eq_(found1, wanted) @@ -1973,7 +2066,7 @@ class CompoundTest(fixtures.TestBase): def test_intersect(self): i = intersect( select([t2.c.col3, t2.c.col4]), - select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3) + select([t2.c.col3, t2.c.col4], t2.c.col4 == t3.c.col3) ) wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] @@ -2024,24 +2117,23 @@ class CompoundTest(fixtures.TestBase): def test_except_style3(self): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( - select([t1.c.col3]), # aaa, bbb, ccc + select([t1.c.col3]), # aaa, bbb, ccc except_( - select([t2.c.col3]), # aaa, bbb, ccc - select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc + select([t2.c.col3]), # aaa, bbb, ccc + select([t3.c.col3], t3.c.col3 == 'ccc'), # ccc ) ) eq_(e.execute().fetchall(), [('ccc',)]) - eq_(e.alias('foo').select().execute().fetchall(), - [('ccc',)]) + eq_(e.alias('foo').select().execute().fetchall(), [('ccc',)]) @testing.requires.except_ def test_except_style4(self): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( - select([t1.c.col3]), # aaa, bbb, ccc + select([t1.c.col3]), # aaa, bbb, ccc except_( - select([t2.c.col3]), # aaa, bbb, ccc - select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc + select([t2.c.col3]), # aaa, bbb, ccc + select([t3.c.col3], t3.c.col3 == 'ccc'), # ccc ).alias().select() ) @@ -2116,6 +2208,8 @@ class CompoundTest(fixtures.TestBase): found = self._fetchall_sorted(ua.select().execute()) eq_(found, wanted) +t1 = t2 = t3 = None + class JoinTest(fixtures.TestBase): """Tests join execution. @@ -2173,7 +2267,7 @@ class JoinTest(fixtures.TestBase): def test_join_x1(self): """Joins t1->t2.""" - for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id): + for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id): expr = select( [t1.c.t1_id, t2.c.t2_id], from_obj=[t1.join(t2, criteria)]) @@ -2182,7 +2276,7 @@ class JoinTest(fixtures.TestBase): def test_join_x2(self): """Joins t1->t2->t3.""" - for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id): + for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id): expr = select( [t1.c.t1_id, t2.c.t2_id], from_obj=[t1.join(t2, criteria)]) @@ -2191,7 +2285,7 @@ class JoinTest(fixtures.TestBase): def test_outerjoin_x1(self): """Outer joins t1->t2.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id], from_obj=[t1.join(t2).join(t3, criteria)]) @@ -2200,147 +2294,129 @@ class JoinTest(fixtures.TestBase): def test_outerjoin_x2(self): """Outer joins t1->t2,t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - from_obj=[t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). \ + from_obj=[t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria)]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None), (12, None, None)]) + self.assertRows( + expr, [(10, 20, 30), (11, 21, None), (12, None, None)]) def test_outerjoin_where_x2_t1(self): """Outer joins t1->t2,t3, where on t1.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t1.c.name == 't1 #10', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t1.c.t1_id < 12, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) def test_outerjoin_where_x2_t2(self): """Outer joins t1->t2,t3, where on t2.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t2.c.name == 't2 #20', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t2.c.t2_id < 29, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_outerjoin_where_x2_t1t2(self): - """Outer joins t1->t2,t3, where on t1 and t2.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, 29 > t2.c.t2_id), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) def test_outerjoin_where_x2_t3(self): """Outer joins t1->t2,t3, where on t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t3.c.name == 't3 #30', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t3.c.t3_id < 39, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) def test_outerjoin_where_x2_t1t3(self): """Outer joins t1->t2,t3, where on t1 and t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.name == 't1 #10', t3.c.name == 't3 #30'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.t1_id < 19, t3.c.t3_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) def test_outerjoin_where_x2_t1t2(self): """Outer joins t1->t2,t3, where on t1 and t2.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.t1_id < 12, t2.c.t2_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) def test_outerjoin_where_x2_t1t2t3(self): """Outer joins t1->t2,t3, where on t1, t2 and t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20', t3.c.name == 't3 #30'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, - t2.c.t2_id < 29, - t3.c.t3_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) + and_(t1.c.t1_id < 19, t2.c.t2_id < 29, t3.c.t3_id < 39), + from_obj=[ + (t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). + outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) def test_mixed(self): """Joins t1->t2, outer t2->t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) @@ -2350,7 +2426,7 @@ class JoinTest(fixtures.TestBase): def test_mixed_where(self): """Joins t1->t2, outer t2->t3, plus a where on each table in turn.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t1.c.name == 't1 #10', @@ -2389,6 +2465,8 @@ class JoinTest(fixtures.TestBase): from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) +metadata = flds = None + class OperatorTest(fixtures.TestBase): __backend__ = True @@ -2397,11 +2475,14 @@ class OperatorTest(fixtures.TestBase): def setup_class(cls): global metadata, flds metadata = MetaData(testing.db) - flds = Table('flds', metadata, - Column('idcol', Integer, primary_key=True, test_needs_autoincrement=True), + flds = Table( + 'flds', metadata, + Column( + 'idcol', Integer, primary_key=True, + test_needs_autoincrement=True), Column('intcol', Integer), Column('strcol', String(50)), - ) + ) metadata.create_all() flds.insert().execute([ @@ -2413,11 +2494,10 @@ class OperatorTest(fixtures.TestBase): def teardown_class(cls): metadata.drop_all() - # TODO: seems like more tests warranted for this setup. @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") def test_modulo(self): eq_( select([flds.c.intcol % 3], @@ -2433,6 +2513,3 @@ class OperatorTest(fixtures.TestBase): ]).execute().fetchall(), [(13, 1), (5, 2)] ) - - - |