summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/generative.py143
-rwxr-xr-xtest/sql/selectable.py72
-rw-r--r--test/sql/testtypes.py41
3 files changed, 122 insertions, 134 deletions
diff --git a/test/sql/generative.py b/test/sql/generative.py
index 41b4caebf..c787d3404 100644
--- a/test/sql/generative.py
+++ b/test/sql/generative.py
@@ -11,10 +11,10 @@ from sqlalchemy.sql import util as sql_util
class TraversalTest(AssertMixin):
"""test ClauseVisitor's traversal, particularly its ability to copy and modify
a ClauseElement in place."""
-
+
def setUpAll(self):
global A, B
-
+
# establish two ficticious ClauseElements.
# define deep equality semantics as well as deep identity semantics.
class A(ClauseElement):
@@ -23,16 +23,16 @@ class TraversalTest(AssertMixin):
def is_other(self, other):
return other is self
-
+
def __eq__(self, other):
return other.expr == self.expr
-
+
def __ne__(self, other):
return other.expr != self.expr
-
+
def __str__(self):
return "A(%s)" % repr(self.expr)
-
+
class B(ClauseElement):
def __init__(self, *items):
self.items = items
@@ -50,22 +50,22 @@ class TraversalTest(AssertMixin):
if i1 != i2:
return False
return True
-
+
def __ne__(self, other):
for i1, i2 in zip(self.items, other.items):
if i1 != i2:
return True
return False
-
- def _copy_internals(self, clone=_clone):
+
+ def _copy_internals(self, clone=_clone):
self.items = [clone(i) for i in self.items]
def get_children(self, **kwargs):
return self.items
-
+
def __str__(self):
return "B(%s)" % repr([str(i) for i in self.items])
-
+
def test_test_classes(self):
a1 = A("expr1")
struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
@@ -78,22 +78,22 @@ class TraversalTest(AssertMixin):
assert struct != struct3
assert not struct.is_other(struct2)
assert not struct.is_other(struct3)
-
- def test_clone(self):
+
+ def test_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
-
+
class Vis(ClauseVisitor):
def visit_a(self, a):
pass
def visit_b(self, b):
pass
-
+
vis = Vis()
s2 = vis.traverse(struct, clone=True)
assert struct == s2
assert not struct.is_other(s2)
- def test_no_clone(self):
+ def test_no_clone(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
class Vis(ClauseVisitor):
@@ -106,7 +106,7 @@ class TraversalTest(AssertMixin):
s2 = vis.traverse(struct, clone=False)
assert struct == s2
assert struct.is_other(s2)
-
+
def test_change_in_place(self):
struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), A("expr2b")), A("expr3"))
@@ -136,41 +136,41 @@ class TraversalTest(AssertMixin):
s3 = vis2.traverse(struct, clone=True)
assert struct != s3
assert struct3 == s3
-
-
+
+
class ClauseTest(SQLCompileTest):
"""test copy-in-place behavior of various ClauseElements."""
-
+
def setUpAll(self):
global t1, t2
- t1 = table("table1",
+ t1 = table("table1",
column("col1"),
column("col2"),
column("col3"),
)
- t2 = table("table2",
+ t2 = table("table2",
column("col1"),
column("col2"),
column("col3"),
)
-
+
def test_binary(self):
clause = t1.c.col2 == t2.c.col2
assert str(clause) == ClauseVisitor().traverse(clause, clone=True)
-
+
def test_join(self):
clause = t1.join(t2, t1.c.col2==t2.c.col2)
c1 = str(clause)
assert str(clause) == str(ClauseVisitor().traverse(clause, clone=True))
-
+
class Vis(ClauseVisitor):
def visit_binary(self, binary):
binary.right = t2.c.col3
-
+
clause2 = Vis().traverse(clause, clone=True)
assert c1 == str(clause)
assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
-
+
def test_text(self):
clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')])
c1 = str(clause)
@@ -178,13 +178,13 @@ class ClauseTest(SQLCompileTest):
def visit_textclause(self, text):
text.text = text.text + " SOME MODIFIER=:lala"
text.bindparams['lala'] = bindparam('lala')
-
+
clause2 = Vis().traverse(clause, clone=True)
assert c1 == str(clause)
assert str(clause2) == c1 + " SOME MODIFIER=:lala"
assert clause.bindparams.keys() == ['bar']
assert util.Set(clause2.bindparams.keys()) == util.Set(['bar', 'lala'])
-
+
def test_select(self):
s2 = select([t1])
s2_assert = str(s2)
@@ -201,7 +201,7 @@ class ClauseTest(SQLCompileTest):
assert str(s2) == s3_assert
print "------------------"
-
+
s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9)))
class Vis(ClauseVisitor):
def visit_select(self, select):
@@ -211,7 +211,7 @@ class ClauseTest(SQLCompileTest):
print str(s4)
assert str(s4) == s4_assert
assert str(s3) == s3_assert
-
+
print "------------------"
s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9)))
class Vis(ClauseVisitor):
@@ -224,10 +224,10 @@ class ClauseTest(SQLCompileTest):
print str(s5)
assert str(s5) == s5_assert
assert str(s4) == s4_assert
-
+
def test_binds(self):
"""test that unique bindparams change their name upon clone() to prevent conflicts"""
-
+
s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias()
s2 = ClauseVisitor().traverse(s, clone=True).alias()
s3 = select([s], s.c.col2==s2.c.col2)
@@ -236,7 +236,7 @@ class ClauseTest(SQLCompileTest):
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) AS anon_1, "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
-
+
s = select([t1], t1.c.col1==4).alias()
s2 = ClauseVisitor().traverse(s, clone=True).alias()
s3 = select([s], s.c.col2==s2.c.col2)
@@ -244,7 +244,8 @@ class ClauseTest(SQLCompileTest):
"table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_1) AS anon_1, "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
-
+
+ @testing.emits_warning('.*replaced by another column with the same key')
def test_alias(self):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
@@ -260,38 +261,38 @@ class ClauseTest(SQLCompileTest):
s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3, clone=True)
assert orig == str(s) == str(s3) == str(s4)
-
+
def test_correlated_select(self):
s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2)
class Vis(ClauseVisitor):
def visit_select(self, select):
select.append_whereclause(t1.c.col2==7)
-
+
self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1")
class ClauseAdapterTest(SQLCompileTest):
def setUpAll(self):
global t1, t2
- t1 = table("table1",
+ t1 = table("table1",
column("col1"),
column("col2"),
column("col3"),
)
- t2 = table("table2",
+ t2 = table("table2",
column("col1"),
column("col2"),
column("col3"),
)
-
+
def test_table_to_alias(self):
-
+
t1alias = t1.alias('t1alias')
-
+
vis = sql_util.ClauseAdapter(t1alias)
ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
assert ff._get_from_objects() == [t1alias]
-
+
self.assert_compile(vis.traverse(select(['*'], from_obj=[t1]), clone=True), "SELECT * FROM table1 AS t1alias")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 WHERE t1alias.col1 = table2.col2")
@@ -306,17 +307,17 @@ class ClauseAdapterTest(SQLCompileTest):
ff = vis.traverse(func.count(t1.c.col1).label('foo'), clone=True)
self.assert_compile(ff, "count(t1alias.col1) AS foo")
assert ff._get_from_objects() == [t1alias]
-
+
# TODO:
# self.assert_compile(vis.traverse(select([func.count(t1.c.col1).label('foo')]), clone=True), "SELECT count(t1alias.col1) AS foo FROM table1 AS t1alias")
-
+
t2alias = t2.alias('t2alias')
vis.chain(sql_util.ClauseAdapter(t2alias))
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
-
+
def test_include_exclude(self):
m = MetaData()
a=Table( 'a',m,
@@ -331,7 +332,7 @@ class ClauseAdapterTest(SQLCompileTest):
e = sql_util.ClauseAdapter( b, include= set([ a.c.id ]),
equivalents= { a.c.id: set([ a.c.id]) }
).traverse( e)
-
+
assert str(e) == "a_1.id = a.xxx_id"
def test_join_to_alias(self):
@@ -363,7 +364,7 @@ class ClauseAdapterTest(SQLCompileTest):
" LEFT OUTER JOIN d ON a_id = d.aid")
j5 = j3.alias('foo')
j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0]
-
+
# this statement takes c join(a join b), wraps it inside an aliased "select * from c join(a join b) AS foo".
# the outermost right side "left outer join d" stays the same, except "d" joins against foo.a_id instead
# of plain "a_id"
@@ -371,30 +372,30 @@ class ClauseAdapterTest(SQLCompileTest):
"c JOIN (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid FROM a LEFT OUTER JOIN b ON a.id = b.aid) "
"ON b_id = c.bid) AS foo"
" LEFT OUTER JOIN d ON foo.a_id = d.aid")
-
+
def test_derived_from(self):
assert select([t1]).is_derived_from(t1)
assert not select([t2]).is_derived_from(t1)
assert not t1.is_derived_from(select([t1]))
assert t1.alias().is_derived_from(t1)
-
-
+
+
s1 = select([t1, t2]).alias('foo')
s2 = select([s1]).limit(5).offset(10).alias()
assert s2.is_derived_from(s1)
s2 = s2._clone()
assert s2.is_derived_from(s1)
-
+
def test_aliasedselect_to_aliasedselect(self):
# original issue from ticket #904
s1 = select([t1]).alias('foo')
s2 = select([s1]).limit(5).offset(10).alias()
- self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1),
+ self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1),
"SELECT foo.col1, foo.col2, foo.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10")
-
+
j = s1.outerjoin(t2, s1.c.col1==t2.c.col1)
- self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
+ self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
"SELECT anon_1.col1, anon_1.col2, anon_1.col3, table2.col1, table2.col2, table2.col3 FROM "\
"(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\
@@ -402,40 +403,40 @@ class ClauseAdapterTest(SQLCompileTest):
talias = t1.alias('bar')
j = s1.outerjoin(talias, s1.c.col1==talias.c.col1)
- self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
+ self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(),
"SELECT anon_1.col1, anon_1.col2, anon_1.col3, bar.col1, bar.col2, bar.col3 FROM "\
"(SELECT foo.col1 AS col1, foo.col2 AS col2, foo.col3 AS col3 FROM "\
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT 5 OFFSET 10) AS anon_1 "\
"LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1")
-
-
+
+
class SelectTest(SQLCompileTest):
"""tests the generative capability of Select"""
def setUpAll(self):
global t1, t2
- t1 = table("table1",
+ t1 = table("table1",
column("col1"),
column("col2"),
column("col3"),
)
- t2 = table("table2",
+ t2 = table("table2",
column("col1"),
column("col2"),
column("col3"),
)
-
+
def test_select(self):
- self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
+ self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3")
-
- self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
+
+ self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
"FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3")
-
+
s = select([t2], t2.c.col1==t1.c.col1, correlate=False)
s = s.correlate(t1).order_by(t2.c.col3)
- self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3),
+ self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3),
"SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
"FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3")
@@ -444,7 +445,7 @@ class SelectTest(SQLCompileTest):
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
select_copy = s.column('yyy')
self.assert_compile(select_copy, "SELECT table1.col1, table1.col2, table1.col3, yyy FROM table1")
- assert s.columns is not select_copy.columns
+ assert s.columns is not select_copy.columns
assert s._columns is not select_copy._columns
assert s._raw_columns is not select_copy._raw_columns
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
@@ -464,8 +465,8 @@ class SelectTest(SQLCompileTest):
self.assert_compile(s2, "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
"(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 "
"WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
-
- s3 = s.correlate(None)
+
+ s3 = s.correlate(None)
self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
"(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 "
"WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
@@ -479,7 +480,7 @@ class SelectTest(SQLCompileTest):
self.assert_compile(select([t1], t1.c.col2==s3.c.col2), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, "
"(SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2, table1 "
"WHERE table1.col1 = table2.col1) WHERE table1.col2 = col2")
-
+
def test_prefixes(self):
s = t1.select()
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
@@ -488,4 +489,4 @@ class SelectTest(SQLCompileTest):
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
if __name__ == '__main__':
- testbase.main()
+ testbase.main()
diff --git a/test/sql/selectable.py b/test/sql/selectable.py
index 1b9959ec4..850e29e16 100755
--- a/test/sql/selectable.py
+++ b/test/sql/selectable.py
@@ -1,18 +1,18 @@
"""tests that various From objects properly export their columns, as well as
useable primary keys and foreign keys. Full relational algebra depends on
every selectable unit behaving nicely with others.."""
-
+
import testbase
from sqlalchemy import *
from testlib import *
metadata = MetaData()
-table = Table('table1', metadata,
+table = Table('table1', metadata,
Column('col1', Integer, primary_key=True),
Column('col2', String(20)),
Column('col3', Integer),
Column('colx', Integer),
-
+
)
table2 = Table('table2', metadata,
@@ -31,47 +31,47 @@ class SelectableTest(AssertMixin):
#assert s.corresponding_column(table.c.col1) is s.c.col1
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
-
+
def testjoinagainstself(self):
jj = select([table.c.col1.label('bar_col1')])
jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
-
+
# test column directly agaisnt itself
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
-
- # test alias of the join, targets the column with the least
+
+ # test alias of the join, targets the column with the least
# "distance" between the requested column and the returned column
# (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
# there is from j2.c.bar_col1 to table.c.col1)
j2 = jjj.alias('foo')
assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
-
+
def testselectontable(self):
sel = select([table, table2], use_labels=True)
assert sel.corresponding_column(table.c.col1) is sel.c.table1_col1
assert sel.corresponding_column(table.c.col1, require_embedded=True) is sel.c.table1_col1
assert table.corresponding_column(sel.c.table1_col1) is table.c.col1
assert table.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
-
+
def testjoinagainstjoin(self):
j = outerjoin(table, table2, table.c.col1==table2.c.col2)
jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
-
+
j2 = jjj.alias('foo')
print j2.corresponding_column(jjj.c.table1_col1)
assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
-
+
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
-
+
def testtablealias(self):
a = table.alias('a')
-
+
j = join(a, table2)
-
+
criterion = a.c.col1 == table2.c.col2
print
print str(j)
@@ -124,7 +124,7 @@ class SelectableTest(AssertMixin):
j1 = table.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
-
+
def testjoin(self):
a = join(table, table2)
print str(a.select(use_labels=True))
@@ -138,7 +138,7 @@ class SelectableTest(AssertMixin):
a = table.select().alias('a')
print str(a.select())
j = join(a, table2)
-
+
criterion = a.c.col1 == table2.c.col2
print criterion
print j.onclause
@@ -148,7 +148,7 @@ class SelectableTest(AssertMixin):
a = table.select(use_labels=True)
print str(a.select())
j = join(a, table2)
-
+
criterion = a.c.table1_col1 == table2.c.col2
print
print str(j)
@@ -163,17 +163,17 @@ class SelectableTest(AssertMixin):
criterion = a.c.acol1 == table2.c.col2
print str(j)
self.assert_(criterion.compare(j.onclause))
-
+
def testselectaliaslabels(self):
a = table2.select(use_labels=True).alias('a')
print str(a.select())
j = join(a, table)
-
+
criterion = table.c.col1 == a.c.table2_col2
print str(criterion)
print str(j.onclause)
self.assert_(criterion.compare(j.onclause))
-
+
def testtablejoinedtoselectoftable(self):
metadata = MetaData()
a = Table('a', metadata,
@@ -195,9 +195,10 @@ class SelectableTest(AssertMixin):
assert j4.corresponding_column(j2.c.aid) is j4.c.aid
assert j4.corresponding_column(a.c.id) is j4.c.id
+ @testing.emits_warning('.*replaced by another column with the same key')
def test_oid(self):
# the oid column of a selectable currently proxies all
- # oid columns found within.
+ # oid columns found within.
s = table.select()
s2 = table2.select()
s3 = select([s, s2])
@@ -205,18 +206,18 @@ class SelectableTest(AssertMixin):
assert s3.corresponding_column(table2.oid_column) is s3.oid_column
assert s3.corresponding_column(s.oid_column) is s3.oid_column
assert s3.corresponding_column(s2.oid_column) is s3.oid_column
-
+
u = s.union(s2)
assert u.corresponding_column(table.oid_column) is u.oid_column
assert u.corresponding_column(table2.oid_column) is u.oid_column
assert u.corresponding_column(s.oid_column) is u.oid_column
assert u.corresponding_column(s2.oid_column) is u.oid_column
-
+
class PrimaryKeyTest(AssertMixin):
def test_join_pk_collapse_implicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
+ """test that redundant columns in a join get 'collapsed' into a minimal primary key,
which is the root column along a chain of foreign key relationships."""
-
+
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True))
b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
@@ -225,7 +226,7 @@ class PrimaryKeyTest(AssertMixin):
assert c.c.id.references(b.c.id)
assert not d.c.id.references(a.c.id)
-
+
assert list(a.join(b).primary_key) == [a.c.id]
assert list(b.join(c).primary_key) == [b.c.id]
assert list(a.join(b).join(c).primary_key) == [a.c.id]
@@ -234,7 +235,7 @@ class PrimaryKeyTest(AssertMixin):
assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
def test_join_pk_collapse_explicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
+ """test that redundant columns in a join get 'collapsed' into a minimal primary key,
which is the root column along a chain of explicit join conditions."""
meta = MetaData()
@@ -251,9 +252,9 @@ class PrimaryKeyTest(AssertMixin):
assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]
assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
-
+
assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
-
+
def test_init_doesnt_blowitaway(self):
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
@@ -261,7 +262,7 @@ class PrimaryKeyTest(AssertMixin):
j = a.join(b)
assert list(j.primary_key) == [a.c.id]
-
+
j.foreign_keys
assert list(j.primary_key) == [a.c.id]
@@ -279,20 +280,20 @@ class DerivedTest(AssertMixin):
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
-
+
assert t1.is_derived_from(t1)
assert not t2.is_derived_from(t1)
-
- def test_alias(self):
+
+ def test_alias(self):
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
-
+
assert t1.alias().is_derived_from(t1)
assert not t2.alias().is_derived_from(t1)
assert not t1.is_derived_from(t1.alias())
assert not t1.is_derived_from(t2.alias())
-
+
def test_select(self):
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
@@ -309,4 +310,3 @@ class DerivedTest(AssertMixin):
if __name__ == "__main__":
testbase.main()
-
diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py
index c11f8fcd1..8f1cc461e 100644
--- a/test/sql/testtypes.py
+++ b/test/sql/testtypes.py
@@ -1,6 +1,5 @@
import testbase
-import pickleable
-import datetime, os, re
+import datetime, os, pickleable, re
from sqlalchemy import *
from sqlalchemy import types, exceptions
from sqlalchemy.sql import operators
@@ -341,24 +340,11 @@ class UnicodeTest(AssertMixin):
self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
def testassert(self):
- import warnings
-
- warnings.filterwarnings("always", r".*non-unicode bind")
-
- ## test that data still goes in if warning is emitted....
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert (select([unicode_table.c.unicode_varchar]).execute().fetchall()
- == [('not unicode', )])
-
- warnings.filterwarnings("error", r".*non-unicode bind")
try:
- try:
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert False
- except RuntimeWarning, e:
- assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
- finally:
- warnings.filterwarnings("always", r".*non-unicode bind")
+ unicode_table.insert().execute(unicode_varchar='not unicode')
+ assert False
+ except exceptions.SAWarning, e:
+ assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
'assert_unicode':True})
@@ -368,6 +354,14 @@ class UnicodeTest(AssertMixin):
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
+
+ @testing.emits_warning('.*non-unicode bind')
+ def warns():
+ # test that data still goes in if warning is emitted....
+ unicode_table.insert().execute(unicode_varchar='not unicode')
+ assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
+ warns()
+
finally:
unicode_engine.dispose()
@@ -674,11 +668,6 @@ class StringTest(AssertMixin):
foo =Table('foo', metadata,
Column('one', String))
- import warnings
- from sqlalchemy.logging import SADeprecationWarning
-
- warnings.filterwarnings("error", r"Using String type with no length.*")
-
# no warning
select([func.count("*")], bind=testbase.db).execute()
@@ -686,7 +675,7 @@ class StringTest(AssertMixin):
# warning during CREATE
foo.create()
assert False
- except SADeprecationWarning, e:
+ except exceptions.SADeprecationWarning, e:
assert "Using String type with no length" in str(e)
assert re.search(r'\bone\b', str(e))
@@ -700,8 +689,6 @@ class StringTest(AssertMixin):
select([func.count("*")], from_obj=bar).execute()
finally:
bar.drop()
- warnings.filterwarnings("always", r"Using String type with no length.*")
-
class NumericTest(AssertMixin):
def setUpAll(self):