diff options
| -rw-r--r-- | test/dialect/mysql.py | 11 | ||||
| -rw-r--r-- | test/engine/reflection.py | 16 | ||||
| -rw-r--r-- | test/orm/inheritance/basic.py | 5 | ||||
| -rw-r--r-- | test/orm/mapper.py | 5 | ||||
| -rw-r--r-- | test/orm/query.py | 195 | ||||
| -rw-r--r-- | test/sql/generative.py | 143 | ||||
| -rwxr-xr-x | test/sql/selectable.py | 72 | ||||
| -rw-r--r-- | test/sql/testtypes.py | 41 | ||||
| -rw-r--r-- | test/testlib/testing.py | 63 |
9 files changed, 280 insertions, 271 deletions
diff --git a/test/dialect/mysql.py b/test/dialect/mysql.py index 0a9153327..df6a2547b 100644 --- a/test/dialect/mysql.py +++ b/test/dialect/mysql.py @@ -1,5 +1,5 @@ import testbase -import sets, warnings +import sets from sqlalchemy import * from sqlalchemy import sql, exceptions from sqlalchemy.databases import mysql @@ -627,6 +627,7 @@ class TypesTest(AssertMixin): def_table.drop() @testing.exclude('mysql', '<', (5, 0, 0)) + @testing.uses_deprecated('Using String type with no length') def test_type_reflection(self): # (ask_for, roundtripped_as_if_different) specs = [( String(), mysql.MSText(), ), @@ -665,13 +666,7 @@ class TypesTest(AssertMixin): m = MetaData(db) t_table = Table('mysql_types', m, *columns) try: - try: - warnings.filterwarnings('ignore', - 'Using String type with no length.*') - m.create_all() - finally: - warnings.filterwarnings("always", - 'Using String type with no length.*') + m.create_all() m2 = MetaData(db) rt = Table('mysql_types', m2, autoload=True) diff --git a/test/engine/reflection.py b/test/engine/reflection.py index ea56e315b..c123b930a 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -1,6 +1,5 @@ import testbase -import pickle, StringIO, unicodedata, warnings - +import pickle, StringIO, unicodedata from sqlalchemy import * from sqlalchemy import exceptions from sqlalchemy import types as sqltypes @@ -206,21 +205,20 @@ class ReflectionTest(PersistTest): dialect_module.ischema_names = {} try: try: - warnings.filterwarnings('error', 'Did not recognize type') m2 = MetaData(testbase.db) t2 = Table("test", m2, autoload=True) assert False - except RuntimeWarning: + except exceptions.SAWarning: assert True - warnings.filterwarnings('ignore', 'Did not recognize type') - m3 = MetaData(testbase.db) - t3 = Table("test", m3, autoload=True) - assert t3.c.foo.type.__class__ == sqltypes.NullType + @testing.emits_warning('Did not recognize type') + def warns(): + m3 = MetaData(testbase.db) + t3 = Table("test", m3, autoload=True) + assert t3.c.foo.type.__class__ == sqltypes.NullType finally: dialect_module.ischema_names = ischema_names - warnings.filterwarnings('always', 'Did not recognize type') t.drop() def test_override_fkandpkcol(self): diff --git a/test/orm/inheritance/basic.py b/test/orm/inheritance/basic.py index 1e46a3f6b..f2b7c6e4f 100644 --- a/test/orm/inheritance/basic.py +++ b/test/orm/inheritance/basic.py @@ -1,5 +1,4 @@ import testbase -import warnings from sqlalchemy import * from sqlalchemy import exceptions, util from sqlalchemy.orm import * @@ -550,14 +549,12 @@ class DistinctPKTest(ORMTest): self._do_test(True) def test_explicit_composite_pk(self): - warnings.filterwarnings("error", r".*On mapper.*distinct primary key") - person_mapper = mapper(Person, person_table) try: mapper(Employee, employee_table, inherits=person_mapper, primary_key=[person_table.c.id, employee_table.c.id]) self._do_test(True) assert False - except RuntimeWarning, e: + except exceptions.SAWarning, e: assert str(e) == "On mapper Mapper|Employee|employees, primary key column 'employees.id' is being combined with distinct primary key column 'persons.id' in attribute 'id'. Use explicit properties to give each column its own mapped attribute name.", str(e) def test_explicit_pk(self): diff --git a/test/orm/mapper.py b/test/orm/mapper.py index ca2f83211..464979f4c 100644 --- a/test/orm/mapper.py +++ b/test/orm/mapper.py @@ -130,15 +130,12 @@ class MapperTest(MapperSuperTest): def bad_expunge(foo): raise Exception("this exception should be stated as a warning") - import warnings - warnings.filterwarnings("always", r".*this exception should be stated as a warning") - sess.expunge = bad_expunge try: Foo(_sa_session=sess) assert False except Exception, e: - assert e is ex + assert isinstance(e, exceptions.SAWarning) clear_mappers() diff --git a/test/orm/query.py b/test/orm/query.py index 233e4ac7f..2d06be363 100644 --- a/test/orm/query.py +++ b/test/orm/query.py @@ -12,7 +12,7 @@ from testlib.fixtures import * class QueryTest(FixtureTest): keep_mappers = True keep_data = True - + def setUpAll(self): super(QueryTest, self).setUpAll() self.setup_mappers() @@ -60,46 +60,41 @@ class GetTest(QueryTest): s.clear() u2 = s.query(User).get(7) assert u is not u2 - + def test_no_criterion(self): """test that get()/load() does not use preexisting filter/etc. criterion""" s = create_session() - - import warnings - warnings.filterwarnings("error", r".*Query.*") - print "---------------------------------------" try: s.query(User).join('addresses').filter(Address.user_id==8).get(7) assert False - except RuntimeWarning, e: + except exceptions.SAWarning, e: assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored." - warnings.filterwarnings("once", r".*Query.*") - - assert s.query(User).filter(User.id==7).get(19) is None + @testing.emits_warning('Query.*') + def warns(): + assert s.query(User).filter(User.id==7).get(19) is None - u = s.query(User).get(7) - assert s.query(User).filter(User.id==9).get(7) is u - s.clear() - assert s.query(User).filter(User.id==9).get(7).id == u.id + u = s.query(User).get(7) + assert s.query(User).filter(User.id==9).get(7) is u + s.clear() + assert s.query(User).filter(User.id==9).get(7).id == u.id - # user 10 has no addresses - u = s.query(User).get(10) - assert s.query(User).join('addresses').get(10) is u - s.clear() - assert s.query(User).join('addresses').get(10).id == u.id - - u = s.query(User).get(7) - assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None - assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u - s.clear() - assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id - - assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id + # user 10 has no addresses + u = s.query(User).get(10) + assert s.query(User).join('addresses').get(10) is u + s.clear() + assert s.query(User).join('addresses').get(10).id == u.id + u = s.query(User).get(7) + assert s.query(User).join('addresses').filter(Address.user_id==8).filter(User.id==7).first() is None + assert s.query(User).join('addresses').filter(Address.user_id==8).get(7) is u + s.clear() + assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id + + assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id + warns() - def test_unique_param_names(self): class SomeUser(object): pass @@ -250,7 +245,7 @@ class OperatorTest(QueryTest): def test_op(self): assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1" - + def test_in(self): self._test(User.id.in_(['a', 'b']), "users.id IN (:users_id_1, :users_id_2)") @@ -391,7 +386,7 @@ class FilterTest(QueryTest): # one to many generates WHERE NOT EXISTS assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all() - + class AggregateTest(QueryTest): def test_sum(self): sess = create_session() @@ -407,7 +402,7 @@ class AggregateTest(QueryTest): assert [User(name=u'ed',id=8)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)> 2).all() assert [User(name=u'jack',id=7), User(name=u'fred',id=9)] == sess.query(User).group_by([c for c in User.c]).join('addresses').having(func.count(Address.c.id)< 2).all() - + class CountTest(QueryTest): def test_basic(self): assert 4 == create_session().query(User).count() @@ -421,28 +416,28 @@ class DistinctTest(QueryTest): def test_joined(self): """test that orderbys from a joined table get placed into the columns clause when DISTINCT is used""" - + sess = create_session() q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address)) assert [User(id=7), User(id=9), User(id=8)] == q.all() sess.clear() - - # test that it works on embedded eagerload/LIMIT subquery + + # test that it works on embedded eagerload/LIMIT subquery q = sess.query(User).join('addresses').distinct().options(eagerload('addresses')).order_by(desc(Address.email_address)).limit(2) def go(): assert [ User(id=7, addresses=[ Address(id=1) - ]), + ]), User(id=9, addresses=[ Address(id=5) - ]), + ]), ] == q.all() self.assert_sql_count(testbase.db, go, 1) - + class YieldTest(QueryTest): def test_basic(self): @@ -463,7 +458,7 @@ class YieldTest(QueryTest): assert False except StopIteration: pass - + class TextTest(QueryTest): def test_fulltext(self): assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users").all() @@ -505,7 +500,7 @@ class ParentTest(QueryTest): # test against None for parent? this can't be done with the current API since we don't know # what mapper to use #assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")] - + def test_noparent(self): sess = create_session() q = sess.query(User) @@ -526,16 +521,16 @@ class ParentTest(QueryTest): class JoinTest(QueryTest): - + def test_getjoinable_tables(self): sess = create_session() - + sel1 = select([users]).alias() sel2 = select([users], from_obj=users.join(addresses)).alias() - + j1 = sel1.join(users, sel1.c.id==users.c.id) j2 = j1.join(addresses) - + for from_obj, assert_cond in ( (users, [users]), (users.join(addresses), [users, addresses]), @@ -544,11 +539,11 @@ class JoinTest(QueryTest): (sel1.join(users, sel1.c.id==users.c.id), [sel1, users]), (sel2.join(users, sel2.c.id==users.c.id), [sel2, users]), (j2, [j1, j2, sel1, users, addresses]) - + ): ret = set(sess.query(User).select_from(from_obj)._get_joinable_tables()) self.assertEquals(ret, set(assert_cond).union([from_obj]), [x.description for x in ret]) - + def test_overlapping_paths(self): for aliased in (True,False): # load a user who has an order that contains item id 3 and address id 1 (order 3, owned by jack) @@ -740,7 +735,7 @@ class InstancesTest(QueryTest): self.assert_sql_count(testbase.db, go, 1) sess.clear() - + def go(): l = q.options(contains_alias('ulist'), contains_eager('addresses')).from_statement(query).all() assert fixtures.user_address_result == l @@ -758,7 +753,7 @@ class InstancesTest(QueryTest): self.assert_sql_count(testbase.db, go, 1) sess.clear() - + def go(): l = q.options(contains_eager('addresses')).from_statement(selectquery).all() assert fixtures.user_address_result[0:3] == l @@ -776,15 +771,15 @@ class InstancesTest(QueryTest): assert fixtures.user_address_result == l self.assert_sql_count(testbase.db, go, 1) sess.clear() - + def go(): # test using the Alias object itself l = q.options(contains_eager('addresses', alias=adalias)).instances(selectquery.execute()) assert fixtures.user_address_result == l self.assert_sql_count(testbase.db, go, 1) - + sess.clear() - + def decorate(row): d = {} for c in addresses.columns: @@ -797,7 +792,7 @@ class InstancesTest(QueryTest): assert fixtures.user_address_result == l self.assert_sql_count(testbase.db, go, 1) sess.clear() - + oalias = orders.alias('o1') ialias = items.alias('i1') query = users.outerjoin(oalias).outerjoin(order_items).outerjoin(ialias).select(use_labels=True).order_by(users.c.id).order_by(oalias.c.id).order_by(ialias.c.id) @@ -809,7 +804,7 @@ class InstancesTest(QueryTest): self.assert_sql_count(testbase.db, go, 1) sess.clear() - + # test using Alias with more than one level deep def go(): l = q.options(contains_eager('orders', alias=oalias), contains_eager('orders.items', alias=ialias)).instances(query.execute()) @@ -839,9 +834,9 @@ class InstancesTest(QueryTest): q = sess.query(User) l = q.instances(selectquery.execute(), Address) assert l == expected - + sess.clear() - + for aliased in (False, True): q = sess.query(User) @@ -862,7 +857,7 @@ class InstancesTest(QueryTest): q = sess.query(User, Address).join('addresses', aliased=aliased).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com') assert q.all() == [(user8, address3)] sess.clear() - + def test_aliased_multi_mappers(self): sess = create_session() @@ -884,7 +879,7 @@ class InstancesTest(QueryTest): assert l == expected sess.clear() - + q = sess.query(User).add_entity(Address, alias=adalias) l = q.select_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all() assert l == [(user8, address3)] @@ -893,18 +888,18 @@ class InstancesTest(QueryTest): sess = create_session() expected = [(u, u.name) for u in sess.query(User).all()] - + for add_col in (User.name, users.c.name, User.c.name): assert sess.query(User).add_column(add_col).all() == expected sess.clear() - + try: sess.query(User).add_column(object()).all() assert False except exceptions.InvalidRequestError, e: assert "Invalid column expression" in str(e) - - + + def test_multi_columns_2(self): """test aliased/nonalised joins with the usage of add_column()""" sess = create_session() @@ -922,13 +917,13 @@ class InstancesTest(QueryTest): l = q.all() assert l == expected sess.clear() - + s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id) q = sess.query(User) l = q.add_column("count").from_statement(s).all() assert l == expected - - + + def test_two_columns(self): sess = create_session() (user7, user8, user9, user10) = sess.query(User).all() @@ -943,9 +938,9 @@ class InstancesTest(QueryTest): q = create_session().query(User) l = q.add_column("count").add_column("concat").from_statement(s).all() assert l == expected - + sess.clear() - + # test with select_from() q = create_session().query(User).add_column(func.count(addresses.c.id))\ .add_column(("Name:" + users.c.name)).select_from(users.outerjoin(addresses))\ @@ -953,7 +948,7 @@ class InstancesTest(QueryTest): assert q.all() == expected sess.clear() - + # test with outerjoin() both aliased and non for aliased in (False, True): q = create_session().query(User).add_column(func.count(addresses.c.id))\ @@ -966,19 +961,19 @@ class InstancesTest(QueryTest): class SelectFromTest(QueryTest): keep_mappers = False - + def setup_mappers(self): pass - + def test_replace_with_select(self): mapper(User, users, properties = { 'addresses':relation(Address) }) mapper(Address, addresses) - + sel = users.select(users.c.id.in_([7, 8])).alias() sess = create_session() - + self.assertEquals(sess.query(User).select_from(sel).all(), [User(id=7), User(id=8)]) self.assertEquals(sess.query(User).select_from(sel).filter(User.c.id==8).all(), [User(id=8)]) @@ -990,8 +985,8 @@ class SelectFromTest(QueryTest): self.assertEquals(sess.query(User).select_from(sel).order_by(asc(User.name)).all(), [ User(name='ed',id=8), User(name='jack',id=7) ]) - - self.assertEquals(sess.query(User).select_from(sel).options(eagerload('addresses')).first(), + + self.assertEquals(sess.query(User).select_from(sel).options(eagerload('addresses')).first(), User(name='jack', addresses=[Address(id=1)]) ) @@ -1004,19 +999,19 @@ class SelectFromTest(QueryTest): sel = users.select(users.c.id.in_([7, 8])) sess = create_session() - self.assertEquals(sess.query(User).select_from(sel).join('addresses').add_entity(Address).order_by(User.id).order_by(Address.id).all(), + self.assertEquals(sess.query(User).select_from(sel).join('addresses').add_entity(Address).order_by(User.id).order_by(Address.id).all(), [ - (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), + (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), + (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)), (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4)) ] ) - self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(), + self.assertEquals(sess.query(User).select_from(sel).join('addresses', aliased=True).add_entity(Address).order_by(User.id).order_by(Address.id).all(), [ - (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), + (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), + (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)), (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4)) ] @@ -1033,10 +1028,10 @@ class SelectFromTest(QueryTest): 'keywords':relation(Keyword, secondary=item_keywords, order_by=keywords.c.id) #m2m }) mapper(Keyword, keywords) - + sel = users.select(users.c.id.in_([7, 8])) sess = create_session() - + self.assertEquals(sess.query(User).select_from(sel).join(['orders', 'items', 'keywords']).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [ User(name=u'jack',id=7) ]) @@ -1049,15 +1044,15 @@ class SelectFromTest(QueryTest): self.assertEquals(sess.query(User).select_from(sel).options(eagerload_all('orders.items.keywords')).join(['orders', 'items', 'keywords'], aliased=True).filter(Keyword.name.in_(['red', 'big', 'round'])).all(), [ User(name=u'jack',orders=[ Order(description=u'order 1',items=[ - Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]), - Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]), + Item(description=u'item 1',keywords=[Keyword(name=u'red'), Keyword(name=u'big'), Keyword(name=u'round')]), + Item(description=u'item 2',keywords=[Keyword(name=u'red',id=2), Keyword(name=u'small',id=5), Keyword(name=u'square')]), Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]) - ]), + ]), Order(description=u'order 3',items=[ - Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]), - Item(description=u'item 4',keywords=[],id=4), + Item(description=u'item 3',keywords=[Keyword(name=u'green',id=3), Keyword(name=u'big',id=4), Keyword(name=u'round',id=6)]), + Item(description=u'item 4',keywords=[],id=4), Item(description=u'item 5',keywords=[],id=5) - ]), + ]), Order(description=u'order 5',items=[Item(description=u'item 5',keywords=[])])]) ]) self.assert_sql_count(testbase.db, go, 1) @@ -1065,36 +1060,36 @@ class SelectFromTest(QueryTest): sess.clear() sel2 = orders.select(orders.c.id.in_([1,2,3])) self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords']).filter(Keyword.name == 'red').all(), [ - Order(description=u'order 1',id=1), - Order(description=u'order 2',id=2), + Order(description=u'order 1',id=1), + Order(description=u'order 2',id=2), ]) self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords'], aliased=True).filter(Keyword.name == 'red').all(), [ - Order(description=u'order 1',id=1), - Order(description=u'order 2',id=2), + Order(description=u'order 1',id=1), + Order(description=u'order 2',id=2), ]) - - + + def test_replace_with_eager(self): mapper(User, users, properties = { 'addresses':relation(Address) }) mapper(Address, addresses) - + sel = users.select(users.c.id.in_([7, 8])) sess = create_session() - + def go(): - self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).all(), + self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).all(), [ - User(id=7, addresses=[Address(id=1)]), + User(id=7, addresses=[Address(id=1)]), User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]) ] ) self.assert_sql_count(testbase.db, go, 1) sess.clear() - + def go(): - self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(), + self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.c.id==8).all(), [User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])] ) self.assert_sql_count(testbase.db, go, 1) @@ -1103,7 +1098,7 @@ class SelectFromTest(QueryTest): def go(): self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])) self.assert_sql_count(testbase.db, go, 1) - + class CustomJoinTest(QueryTest): keep_mappers = False @@ -1244,5 +1239,3 @@ class ExternalColumnsTest(QueryTest): if __name__ == '__main__': testbase.main() - - diff --git a/test/sql/generative.py b/test/sql/generative.py index 41b4caebf..c787d3404 100644 --- a/test/sql/generative.py +++ b/test/sql/generative.py @@ -11,10 +11,10 @@ from sqlalchemy.sql import util as sql_util class TraversalTest(AssertMixin): """test ClauseVisitor's traversal, particularly its ability to copy and modify a ClauseElement in place.""" - + def setUpAll(self): global A, B - + # establish two ficticious ClauseElements. # define deep equality semantics as well as deep identity semantics. class A(ClauseElement): @@ -23,16 +23,16 @@ class TraversalTest(AssertMixin): def is_other(self, other): return other is self - + def __eq__(self, other): return other.expr == self.expr - + def __ne__(self, other): return other.expr != self.expr - + def __str__(self): return "A(%s)" % repr(self.expr) - + class B(ClauseElement): def __init__(self, *items): self.items = items @@ -50,22 +50,22 @@ class TraversalTest(AssertMixin): if i1 != i2: return False return True - + def __ne__(self, other): for i1, i2 in zip(self.items, other.items): if i1 != i2: return True return False - - def _copy_internals(self, clone=_clone): + + def _copy_internals(self, clone=_clone): self.items = [clone(i) for i in self.items] def get_children(self, **kwargs): return self.items - + def __str__(self): return "B(%s)" % repr([str(i) for i in self.items]) - + def test_test_classes(self): a1 = A("expr1") struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) @@ -78,22 +78,22 @@ class TraversalTest(AssertMixin): assert struct != struct3 assert not struct.is_other(struct2) assert not struct.is_other(struct3) - - def test_clone(self): + + def test_clone(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) - + class Vis(ClauseVisitor): def visit_a(self, a): pass def visit_b(self, b): pass - + vis = Vis() s2 = vis.traverse(struct, clone=True) assert struct == s2 assert not struct.is_other(s2) - def test_no_clone(self): + def test_no_clone(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) class Vis(ClauseVisitor): @@ -106,7 +106,7 @@ class TraversalTest(AssertMixin): s2 = vis.traverse(struct, clone=False) assert struct == s2 assert struct.is_other(s2) - + def test_change_in_place(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), A("expr2b")), A("expr3")) @@ -136,41 +136,41 @@ class TraversalTest(AssertMixin): s3 = vis2.traverse(struct, clone=True) assert struct != s3 assert struct3 == s3 - - + + class ClauseTest(SQLCompileTest): """test copy-in-place behavior of various ClauseElements.""" - + def setUpAll(self): global t1, t2 - t1 = table("table1", + t1 = table("table1", column("col1"), column("col2"), column("col3"), ) - t2 = table("table2", + t2 = table("table2", column("col1"), column("col2"), column("col3"), ) - + def test_binary(self): clause = t1.c.col2 == t2.c.col2 assert str(clause) == ClauseVisitor().traverse(clause, clone=True) - + def test_join(self): clause = t1.join(t2, t1.c.col2==t2.c.col2) c1 = str(clause) assert str(clause) == str(ClauseVisitor().traverse(clause, clone=True)) - + class Vis(ClauseVisitor): def visit_binary(self, binary): binary.right = t2.c.col3 - + clause2 = Vis().traverse(clause, clone=True) assert c1 == str(clause) assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3)) - + def test_text(self): clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')]) c1 = str(clause) @@ -178,13 +178,13 @@ class ClauseTest(SQLCompileTest): def visit_textclause(self, text): text.text = text.text + " SOME MODIFIER=:lala" text.bindparams['lala'] = bindparam('lala') - + clause2 = Vis().traverse(clause, clone=True) assert c1 == str(clause) assert str(clause2) == c1 + " SOME MODIFIER=:lala" assert clause.bindparams.keys() == ['bar'] assert util.Set(clause2.bindparams.keys()) == util.Set(['bar', 'lala']) - + def test_select(self): s2 = select([t1]) s2_assert = str(s2) @@ -201,7 +201,7 @@ class ClauseTest(SQLCompileTest): assert str(s2) == s3_assert print "------------------" - + s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9))) class Vis(ClauseVisitor): def visit_select(self, select): @@ -211,7 +211,7 @@ class ClauseTest(SQLCompileTest): print str(s4) assert str(s4) == s4_assert assert str(s3) == s3_assert - + print "------------------" s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9))) class Vis(ClauseVisitor): @@ -224,10 +224,10 @@ class ClauseTest(SQLCompileTest): print str(s5) assert str(s5) == s5_assert assert str(s4) == s4_assert - + def test_binds(self): """test that unique bindparams change their name upon clone() to prevent conflicts""" - + s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias() s2 = ClauseVisitor().traverse(s, clone=True).alias() s3 = select([s], s.c.col2==s2.c.col2) @@ -236,7 +236,7 @@ class ClauseTest(SQLCompileTest): "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) AS anon_1, "\ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "\ "WHERE anon_1.col2 = anon_2.col2") - + s = select([t1], t1.c.col1==4).alias() s2 = ClauseVisitor().traverse(s, clone=True).alias() s3 = select([s], s.c.col2==s2.c.col2) @@ -244,7 +244,8 @@ class ClauseTest(SQLCompileTest): "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_1) AS anon_1, "\ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_2 "\ "WHERE anon_1.col2 = anon_2.col2") - + + @testing.emits_warning('.*replaced by another column with the same key') def test_alias(self): subq = t2.select().alias('subq') s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)]) @@ -260,38 +261,38 @@ class ClauseTest(SQLCompileTest): s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3, clone=True) assert orig == str(s) == str(s3) == str(s4) - + def test_correlated_select(self): s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2) class Vis(ClauseVisitor): def visit_select(self, select): select.append_whereclause(t1.c.col2==7) - + self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1") class ClauseAdapterTest(SQLCompileTest): def setUpAll(self): global t1, t2 - t1 = table("table1", + t1 = table("table1", column("col1"), column("col2"), column("col3"), ) - t2 = table("table2", + t2 = table("table2", column("col1"), column("col2"), column("col3"), ) - + def test_table_to_alias(self): - + t1alias = t1.alias('t1alias') - + vis = sql_util.ClauseAdapter(t1alias) ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True) assert ff._get_from_objects() == [t1alias] - + self.assert_compile(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2") @@ -306,17 +307,17 @@ class ClauseAdapterTest(SQLCompileTest): ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True) self.assert_compile(ff, "count(t1alias.col1) AS foo") assert ff._get_from_objects() == [t1alias] - + # TODO: # self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias") - + t2alias = t2.alias('t2alias') vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2") - + def test_include_exclude(self): m = MetaData() a=Table( 'a',m, @@ -331,7 +332,7 @@ class ClauseAdapterTest(SQLCompileTest): e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]), equivalents= { a.c.id: set([ a.c.id]) } ).traverse( e) - + assert str(e) == "a_1.id = a.xxx_id" def test_join_to_alias(self): @@ -363,7 +364,7 @@ class ClauseAdapterTest(SQLCompileTest): " LEFT OUTER JOIN d ON a_id = d.aid") j5 = j3.alias('foo') j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0] - + # this statement takes c join(a join b), wraps it inside an aliased "select * from c join(a join b) AS foo". # the outermost right side "left outer join d" stays the same, except "d" joins against foo.a_id instead # of plain "a_id" @@ -371,30 +372,30 @@ class ClauseAdapterTest(SQLCompileTest): "c JOIN (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid FROM a LEFT OUTER JOIN b ON a.id = b.aid) " "ON b_id = c.bid) AS foo" " LEFT OUTER JOIN d ON foo.a_id = d.aid") - + def test_derived_from(self): assert select([t1]).is_derived_from(t1) assert not select([t2]).is_derived_from(t1) assert not t1.is_derived_from(select([t1])) assert t1.alias().is_derived_from(t1) - - + + s1 = select([t1, t2]).alias('foo') s2 = select([s1]).limit(5).offset(10).alias() assert s2.is_derived_from(s1) s2 = s2._clone() assert s2.is_derived_from(s1) - + def test_aliasedselect_to_aliasedselect(self): # original issue from ticket #904 s1 = select([t1]).alias('foo') s2 = select([s1]).limit(5).offset(10).alias() - self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1), + self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1), "SELECT foo.col1, foo.col2, foo.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10") - + j = s1.outerjoin(t2, s1.c.col1==t2.c.col1) - self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), + self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), "SELECT anon_1.col1, anon_1.col2, anon_1.col3, table2.col1, table2.col2, table2.col3 FROM "\ "(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\ @@ -402,40 +403,40 @@ class ClauseAdapterTest(SQLCompileTest): talias = t1.alias('bar') j = s1.outerjoin(talias, s1.c.col1==talias.c.col1) - self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), + self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), "SELECT anon_1.col1, anon_1.col2, anon_1.col3, bar.col1, bar.col2, bar.col3 FROM "\ "(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\ "LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1") - - + + class SelectTest(SQLCompileTest): """tests the generative capability of Select""" def setUpAll(self): global t1, t2 - t1 = table("table1", + t1 = table("table1", column("col1"), column("col2"), column("col3"), ) - t2 = table("table2", + t2 = table("table2", column("col1"), column("col2"), column("col3"), ) - + def test_select(self): - self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), + self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3") - - self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), + + self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\ "FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3") - + s = select([t2], t2.c.col1==t1.c.col1, correlate=False) s = s.correlate(t1).order_by(t2.c.col3) - self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), + self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\ "FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3") @@ -444,7 +445,7 @@ class SelectTest(SQLCompileTest): self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") select_copy = s.column('yyy') self.assert_compile(select_copy, "SELECT table1.col1, table1.col2, table1.col3, yyy FROM table1") - assert s.columns is not select_copy.columns + assert s.columns is not select_copy.columns assert s._columns is not select_copy._columns assert s._raw_columns is not select_copy._raw_columns self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") @@ -464,8 +465,8 @@ class SelectTest(SQLCompileTest): self.assert_compile(s2, "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 " "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") - - s3 = s.correlate(None) + + s3 = s.correlate(None) self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 " "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") @@ -479,7 +480,7 @@ class SelectTest(SQLCompileTest): self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, " "(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 " "WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2") - + def test_prefixes(self): s = t1.select() self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") @@ -488,4 +489,4 @@ class SelectTest(SQLCompileTest): self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") if __name__ == '__main__': - testbase.main() + testbase.main() diff --git a/test/sql/selectable.py b/test/sql/selectable.py index 1b9959ec4..850e29e16 100755 --- a/test/sql/selectable.py +++ b/test/sql/selectable.py @@ -1,18 +1,18 @@ """tests that various From objects properly export their columns, as well as useable primary keys and foreign keys. Full relational algebra depends on every selectable unit behaving nicely with others..""" - + import testbase from sqlalchemy import * from testlib import * metadata = MetaData() -table = Table('table1', metadata, +table = Table('table1', metadata, Column('col1', Integer, primary_key=True), Column('col2', String(20)), Column('col3', Integer), Column('colx', Integer), - + ) table2 = Table('table2', metadata, @@ -31,47 +31,47 @@ class SelectableTest(AssertMixin): #assert s.corresponding_column(table.c.col1) is s.c.col1 assert s.corresponding_column(s.c.col1) is s.c.col1 assert s.corresponding_column(s.c.c1) is s.c.c1 - + def testjoinagainstself(self): jj = select([table.c.col1.label('bar_col1')]) jjj = join(table, jj, table.c.col1==jj.c.bar_col1) - + # test column directly agaisnt itself assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 - - # test alias of the join, targets the column with the least + + # test alias of the join, targets the column with the least # "distance" between the requested column and the returned column # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than # there is from j2.c.bar_col1 to table.c.col1) j2 = jjj.alias('foo') assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1 - + def testselectontable(self): sel = select([table, table2], use_labels=True) assert sel.corresponding_column(table.c.col1) is sel.c.table1_col1 assert sel.corresponding_column(table.c.col1, require_embedded=True) is sel.c.table1_col1 assert table.corresponding_column(sel.c.table1_col1) is table.c.col1 assert table.corresponding_column(sel.c.table1_col1, require_embedded=True) is None - + def testjoinagainstjoin(self): j = outerjoin(table, table2, table.c.col1==table2.c.col2) jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo') jjj = join(table, jj, table.c.col1==jj.c.bar_col1) assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 - + j2 = jjj.alias('foo') print j2.corresponding_column(jjj.c.table1_col1) assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 - + assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 - + def testtablealias(self): a = table.alias('a') - + j = join(a, table2) - + criterion = a.c.col1 == table2.c.col2 print print str(j) @@ -124,7 +124,7 @@ class SelectableTest(AssertMixin): j1 = table.join(table2) assert u.corresponding_column(j1.c.table1_colx) is u.c.colx assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx - + def testjoin(self): a = join(table, table2) print str(a.select(use_labels=True)) @@ -138,7 +138,7 @@ class SelectableTest(AssertMixin): a = table.select().alias('a') print str(a.select()) j = join(a, table2) - + criterion = a.c.col1 == table2.c.col2 print criterion print j.onclause @@ -148,7 +148,7 @@ class SelectableTest(AssertMixin): a = table.select(use_labels=True) print str(a.select()) j = join(a, table2) - + criterion = a.c.table1_col1 == table2.c.col2 print print str(j) @@ -163,17 +163,17 @@ class SelectableTest(AssertMixin): criterion = a.c.acol1 == table2.c.col2 print str(j) self.assert_(criterion.compare(j.onclause)) - + def testselectaliaslabels(self): a = table2.select(use_labels=True).alias('a') print str(a.select()) j = join(a, table) - + criterion = table.c.col1 == a.c.table2_col2 print str(criterion) print str(j.onclause) self.assert_(criterion.compare(j.onclause)) - + def testtablejoinedtoselectoftable(self): metadata = MetaData() a = Table('a', metadata, @@ -195,9 +195,10 @@ class SelectableTest(AssertMixin): assert j4.corresponding_column(j2.c.aid) is j4.c.aid assert j4.corresponding_column(a.c.id) is j4.c.id + @testing.emits_warning('.*replaced by another column with the same key') def test_oid(self): # the oid column of a selectable currently proxies all - # oid columns found within. + # oid columns found within. s = table.select() s2 = table2.select() s3 = select([s, s2]) @@ -205,18 +206,18 @@ class SelectableTest(AssertMixin): assert s3.corresponding_column(table2.oid_column) is s3.oid_column assert s3.corresponding_column(s.oid_column) is s3.oid_column assert s3.corresponding_column(s2.oid_column) is s3.oid_column - + u = s.union(s2) assert u.corresponding_column(table.oid_column) is u.oid_column assert u.corresponding_column(table2.oid_column) is u.oid_column assert u.corresponding_column(s.oid_column) is u.oid_column assert u.corresponding_column(s2.oid_column) is u.oid_column - + class PrimaryKeyTest(AssertMixin): def test_join_pk_collapse_implicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, + """test that redundant columns in a join get 'collapsed' into a minimal primary key, which is the root column along a chain of foreign key relationships.""" - + meta = MetaData() a = Table('a', meta, Column('id', Integer, primary_key=True)) b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True)) @@ -225,7 +226,7 @@ class PrimaryKeyTest(AssertMixin): assert c.c.id.references(b.c.id) assert not d.c.id.references(a.c.id) - + assert list(a.join(b).primary_key) == [a.c.id] assert list(b.join(c).primary_key) == [b.c.id] assert list(a.join(b).join(c).primary_key) == [a.c.id] @@ -234,7 +235,7 @@ class PrimaryKeyTest(AssertMixin): assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] def test_join_pk_collapse_explicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, + """test that redundant columns in a join get 'collapsed' into a minimal primary key, which is the root column along a chain of explicit join conditions.""" meta = MetaData() @@ -251,9 +252,9 @@ class PrimaryKeyTest(AssertMixin): assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id] assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id] assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id] - + assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id] - + def test_init_doesnt_blowitaway(self): meta = MetaData() a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) @@ -261,7 +262,7 @@ class PrimaryKeyTest(AssertMixin): j = a.join(b) assert list(j.primary_key) == [a.c.id] - + j.foreign_keys assert list(j.primary_key) == [a.c.id] @@ -279,20 +280,20 @@ class DerivedTest(AssertMixin): meta = MetaData() t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - + assert t1.is_derived_from(t1) assert not t2.is_derived_from(t1) - - def test_alias(self): + + def test_alias(self): meta = MetaData() t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) - + assert t1.alias().is_derived_from(t1) assert not t2.alias().is_derived_from(t1) assert not t1.is_derived_from(t1.alias()) assert not t1.is_derived_from(t2.alias()) - + def test_select(self): meta = MetaData() t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30))) @@ -309,4 +310,3 @@ class DerivedTest(AssertMixin): if __name__ == "__main__": testbase.main() - diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index c11f8fcd1..8f1cc461e 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -1,6 +1,5 @@ import testbase -import pickleable -import datetime, os, re +import datetime, os, pickleable, re from sqlalchemy import * from sqlalchemy import types, exceptions from sqlalchemy.sql import operators @@ -341,24 +340,11 @@ class UnicodeTest(AssertMixin): self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata) def testassert(self): - import warnings - - warnings.filterwarnings("always", r".*non-unicode bind") - - ## test that data still goes in if warning is emitted.... - unicode_table.insert().execute(unicode_varchar='not unicode') - assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() - == [('not unicode', )]) - - warnings.filterwarnings("error", r".*non-unicode bind") try: - try: - unicode_table.insert().execute(unicode_varchar='not unicode') - assert False - except RuntimeWarning, e: - assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e) - finally: - warnings.filterwarnings("always", r".*non-unicode bind") + unicode_table.insert().execute(unicode_varchar='not unicode') + assert False + except exceptions.SAWarning, e: + assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e) unicode_engine = engines.utf8_engine(options={'convert_unicode':True, 'assert_unicode':True}) @@ -368,6 +354,14 @@ class UnicodeTest(AssertMixin): assert False except exceptions.InvalidRequestError, e: assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'" + + @testing.emits_warning('.*non-unicode bind') + def warns(): + # test that data still goes in if warning is emitted.... + unicode_table.insert().execute(unicode_varchar='not unicode') + assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )]) + warns() + finally: unicode_engine.dispose() @@ -674,11 +668,6 @@ class StringTest(AssertMixin): foo =Table('foo', metadata, Column('one', String)) - import warnings - from sqlalchemy.logging import SADeprecationWarning - - warnings.filterwarnings("error", r"Using String type with no length.*") - # no warning select([func.count("*")], bind=testbase.db).execute() @@ -686,7 +675,7 @@ class StringTest(AssertMixin): # warning during CREATE foo.create() assert False - except SADeprecationWarning, e: + except exceptions.SADeprecationWarning, e: assert "Using String type with no length" in str(e) assert re.search(r'\bone\b', str(e)) @@ -700,8 +689,6 @@ class StringTest(AssertMixin): select([func.count("*")], from_obj=bar).execute() finally: bar.drop() - warnings.filterwarnings("always", r"Using String type with no length.*") - class NumericTest(AssertMixin): def setUpAll(self): diff --git a/test/testlib/testing.py b/test/testlib/testing.py index 526700c2a..1231dc126 100644 --- a/test/testlib/testing.py +++ b/test/testlib/testing.py @@ -7,7 +7,7 @@ import itertools, unittest, re, sys, os, operator, warnings from cStringIO import StringIO import testlib.config as config sql, MetaData, clear_mappers, Session, util = None, None, None, None, None -salogging = None +sa_exceptions = None __all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest') @@ -169,6 +169,46 @@ def _server_version(bind=None): bind = config.db return bind.dialect.server_version_info(bind.contextual_connect()) +def emits_warning(*messages): + """Mark a test as emitting a warning. + + With no arguments, squelches all SAWarning failures. Or pass one or more + strings; these will be matched to the root of the warning description by + warnings.filterwarnings(). + """ + + # TODO: it would be nice to assert that a named warning was + # emitted. should work with some monkeypatching of warnings, + # and may work on non-CPython if they keep to the spirit of + # warnings.showwarning's docstring. + # - update: jython looks ok, it uses cpython's module + def decorate(fn): + def safe(*args, **kw): + global sa_exceptions + if sa_exceptions is None: + import sqlalchemy.exceptions as sa_exceptions + + if not messages: + filters = [dict(action='ignore', + category=sa_exceptions.SAWarning)] + else: + filters = [dict(action='ignore', + message=message, + category=sa_exceptions.SAWarning) + for message in messages ] + for f in filters: + warnings.filterwarnings(**f) + try: + return fn(*args, **kw) + finally: + resetwarnings() + try: + safe.__name__ = fn.__name__ + except: + pass + return safe + return decorate + def uses_deprecated(*messages): """Mark a test as immune from fatal deprecation warnings. @@ -183,17 +223,17 @@ def uses_deprecated(*messages): def decorate(fn): def safe(*args, **kw): - global salogging - if salogging is None: - from sqlalchemy import logging as salogging + global sa_exceptions + if sa_exceptions is None: + import sqlalchemy.exceptions as sa_exceptions if not messages: filters = [dict(action='ignore', - category=salogging.SADeprecationWarning)] + category=sa_exceptions.SADeprecationWarning)] else: filters = [dict(action='ignore', message=message, - category=salogging.SADeprecationWarning) + category=sa_exceptions.SADeprecationWarning) for message in [ (m.startswith('//') and ('Call to deprecated function ' + m[2:]) or m) @@ -206,7 +246,7 @@ def uses_deprecated(*messages): finally: resetwarnings() try: - safe.__name__ = fn.name + safe.__name__ = fn.__name__ except: pass return safe @@ -215,11 +255,12 @@ def uses_deprecated(*messages): def resetwarnings(): """Reset warning behavior to testing defaults.""" - global salogging - if salogging is None: - from sqlalchemy import logging as salogging + global sa_exceptions + if sa_exceptions is None: + import sqlalchemy.exceptions as sa_exceptions warnings.resetwarnings() - warnings.filterwarnings('error', category=salogging.SADeprecationWarning) + warnings.filterwarnings('error', category=sa_exceptions.SADeprecationWarning) + warnings.filterwarnings('error', category=sa_exceptions.SAWarning) def against(*queries): """Boolean predicate, compares to testing database configuration. |
