diff options
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_query.py | 77 | ||||
| -rw-r--r-- | test/sql/test_select.py | 169 |
2 files changed, 199 insertions, 47 deletions
diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 953dcab7f..2500cde60 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -1116,6 +1116,7 @@ class CompoundTest(TestBase): @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') @testing.fails_on('mysql', 'FIXME: unknown') + @testing.fails_on('sqlite', "Can't handle this style of nesting") def test_except_style1(self): e = except_(union( select([t1.c.col3, t1.c.col4]), @@ -1126,7 +1127,7 @@ class CompoundTest(TestBase): wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] - found = self._fetchall_sorted(e.alias('bar').select().execute()) + found = self._fetchall_sorted(e.alias().select().execute()) eq_(found, wanted) @testing.crashes('firebird', 'Does not support except') @@ -1134,11 +1135,14 @@ class CompoundTest(TestBase): @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') @testing.fails_on('mysql', 'FIXME: unknown') def test_except_style2(self): + # same as style1, but add alias().select() to the except_(). + # sqlite can handle it now. + e = except_(union( select([t1.c.col3, t1.c.col4]), select([t2.c.col3, t2.c.col4]), select([t3.c.col3, t3.c.col4]), - ).alias('foo').select(), select([t2.c.col3, t2.c.col4])) + ).alias().select(), select([t2.c.col3, t2.c.col4])) wanted = [('aaa', 'aaa'), ('aaa', 'ccc'), ('bbb', 'aaa'), ('bbb', 'bbb'), ('ccc', 'bbb'), ('ccc', 'ccc')] @@ -1146,14 +1150,14 @@ class CompoundTest(TestBase): found1 = self._fetchall_sorted(e.execute()) eq_(found1, wanted) - found2 = self._fetchall_sorted(e.alias('bar').select().execute()) + found2 = self._fetchall_sorted(e.alias().select().execute()) eq_(found2, wanted) @testing.crashes('firebird', 'Does not support except') @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') @testing.fails_on('mysql', 'FIXME: unknown') - @testing.fails_on('sqlite', 'FIXME: unknown') + @testing.fails_on('sqlite', "Can't handle this style of nesting") def test_except_style3(self): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( @@ -1167,16 +1171,73 @@ class CompoundTest(TestBase): eq_(e.alias('foo').select().execute().fetchall(), [('ccc',)]) + @testing.crashes('firebird', 'Does not support except') + @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') + @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') + @testing.fails_on('mysql', 'FIXME: unknown') + def test_except_style4(self): + # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc + e = except_( + select([t1.c.col3]), # aaa, bbb, ccc + except_( + select([t2.c.col3]), # aaa, bbb, ccc + select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc + ).alias().select() + ) + + eq_(e.execute().fetchall(), [('ccc',)]) + eq_( + e.alias().select().execute().fetchall(), + [('ccc',)] + ) + + @testing.crashes('firebird', 'Does not support intersect') + @testing.fails_on('mysql', 'FIXME: unknown') + @testing.fails_on('sqlite', "sqlite can't handle leading parenthesis") + def test_intersect_unions(self): + u = intersect( + union( + select([t1.c.col3, t1.c.col4]), + select([t3.c.col3, t3.c.col4]), + ), + union( + select([t2.c.col3, t2.c.col4]), + select([t3.c.col3, t3.c.col4]), + ).alias().select() + ) + wanted = [('aaa', 'ccc'), ('bbb', 'aaa'), ('ccc', 'bbb')] + found = self._fetchall_sorted(u.execute()) + + eq_(found, wanted) + + @testing.crashes('firebird', 'Does not support intersect') + @testing.fails_on('mysql', 'FIXME: unknown') + def test_intersect_unions_2(self): + u = intersect( + union( + select([t1.c.col3, t1.c.col4]), + select([t3.c.col3, t3.c.col4]), + ).alias().select(), + union( + select([t2.c.col3, t2.c.col4]), + select([t3.c.col3, t3.c.col4]), + ).alias().select() + ) + wanted = [('aaa', 'ccc'), ('bbb', 'aaa'), ('ccc', 'bbb')] + found = self._fetchall_sorted(u.execute()) + + eq_(found, wanted) + @testing.crashes('firebird', 'Does not support intersect') @testing.fails_on('mysql', 'FIXME: unknown') - def test_composite(self): + def test_intersect(self): u = intersect( select([t2.c.col3, t2.c.col4]), union( select([t1.c.col3, t1.c.col4]), select([t2.c.col3, t2.c.col4]), select([t3.c.col3, t3.c.col4]), - ).alias('foo').select() + ).alias().select() ) wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] found = self._fetchall_sorted(u.execute()) @@ -1192,8 +1253,8 @@ class CompoundTest(TestBase): select([t1.c.col3, t1.c.col4]), select([t2.c.col3, t2.c.col4]), select([t3.c.col3, t3.c.col4]), - ).alias('foo').select() - ).alias('bar') + ).alias().select() + ).alias() wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] found = self._fetchall_sorted(ua.select().execute()) diff --git a/test/sql/test_select.py b/test/sql/test_select.py index d063bd2d9..28317db57 100644 --- a/test/sql/test_select.py +++ b/test/sql/test_select.py @@ -1045,20 +1045,31 @@ EXISTS (select yay from foo where boo = lar)", order_by = [table1.c.myid], ) - self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \ -FROM mytable WHERE mytable.myid = :myid_1 UNION \ -SELECT mytable.myid, mytable.name, mytable.description \ -FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid") + self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description "\ + "FROM mytable WHERE mytable.myid = :myid_1 UNION "\ + "SELECT mytable.myid, mytable.name, mytable.description "\ + "FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid") + x = union( + select([table1]), + select([table1]) + ) + x = union(x, select([table1])) + self.assert_compile(x, "(SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable UNION SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable) UNION SELECT mytable.myid," + " mytable.name, mytable.description FROM mytable") + u1 = union( select([table1.c.myid, table1.c.name]), select([table2]), select([table3]) ) - self.assert_compile(u1, - "SELECT mytable.myid, mytable.name \ -FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ -FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable") + self.assert_compile(u1, "SELECT mytable.myid, mytable.name " + "FROM mytable UNION SELECT myothertable.otherid, " + "myothertable.othername FROM myothertable " + "UNION SELECT thirdtable.userid, thirdtable.otherstuff " + "FROM thirdtable") assert u1.corresponding_column(table2.c.otherid) is u1.c.myid @@ -1070,21 +1081,23 @@ FROM myothertable UNION SELECT thirdtable.userid, thirdtable.otherstuff FROM thi order_by=['myid'], offset=10, limit=5 - ) - , "SELECT mytable.myid, mytable.name \ -FROM mytable UNION SELECT myothertable.otherid, myothertable.othername \ -FROM myothertable ORDER BY myid LIMIT 5 OFFSET 10" + ), + "SELECT mytable.myid, mytable.name " + "FROM mytable UNION SELECT myothertable.otherid, myothertable.othername " + "FROM myothertable ORDER BY myid LIMIT 5 OFFSET 10" ) self.assert_compile( union( - select([table1.c.myid, table1.c.name, func.max(table1.c.description)], table1.c.name=='name2', group_by=[table1.c.myid, table1.c.name]), + select([table1.c.myid, table1.c.name, func.max(table1.c.description)], + table1.c.name=='name2', + group_by=[table1.c.myid, table1.c.name]), table1.select(table1.c.name=='name1') - ) - , - "SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 FROM mytable \ -WHERE mytable.name = :name_1 GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \ -FROM mytable WHERE mytable.name = :name_2" + ), + "SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 " + "FROM mytable WHERE mytable.name = :name_1 GROUP BY mytable.myid, " + "mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.name = :name_2" ) self.assert_compile( @@ -1104,38 +1117,116 @@ FROM mytable WHERE mytable.name = :name_2" ) ) , - "SELECT mytable.myid FROM mytable UNION ALL (SELECT myothertable.otherid FROM myothertable UNION \ -SELECT thirdtable.userid FROM thirdtable)" - ) - # This doesn't need grouping, so don't group to not give sqlite unnecessarily hard time - self.assert_compile( - union( - except_( - select([table2.c.otherid]), - select([table3.c.userid]), - ), - select([table1.c.myid]) - ) - , - "SELECT myothertable.otherid FROM myothertable EXCEPT SELECT thirdtable.userid FROM thirdtable \ -UNION SELECT mytable.myid FROM mytable" + "SELECT mytable.myid FROM mytable UNION ALL " + "(SELECT myothertable.otherid FROM myothertable UNION " + "SELECT thirdtable.userid FROM thirdtable)" ) + s = select([column('foo'), column('bar')]) - s = union(s, s) - s = union(s, s) - self.assert_compile(s, "SELECT foo, bar UNION SELECT foo, bar UNION (SELECT foo, bar UNION SELECT foo, bar)") - - s = select([column('foo'), column('bar')]) + # ORDER BY's even though not supported by all DB's, are rendered if requested self.assert_compile(union(s.order_by("foo"), s.order_by("bar")), "SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar" ) # self_group() is honored - self.assert_compile(union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()), + self.assert_compile( + union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()), "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT 10)" ) + def test_compound_grouping(self): + s = select([column('foo'), column('bar')]).select_from('bat') + + self.assert_compile( + union(union(union(s, s), s), s), + "((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " + "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat" + ) + + self.assert_compile( + union(s, s, s, s), + "SELECT foo, bar FROM bat UNION SELECT foo, bar " + "FROM bat UNION SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat" + ) + + self.assert_compile( + union(s, union(s, union(s, s))), + "SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat " + "UNION (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat))" + ) + + self.assert_compile( + select([s.alias()]), + 'SELECT anon_1.foo, anon_1.bar FROM (SELECT foo, bar FROM bat) AS anon_1' + ) + + self.assert_compile( + select([union(s, s).alias()]), + 'SELECT anon_1.foo, anon_1.bar FROM ' + '(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) AS anon_1' + ) + + self.assert_compile( + select([except_(s, s).alias()]), + 'SELECT anon_1.foo, anon_1.bar FROM ' + '(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1' + ) + + # this query sqlite specifically chokes on + self.assert_compile( + union( + except_(s, s), + s + ), + "(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) " + "UNION SELECT foo, bar FROM bat" + ) + + self.assert_compile( + union( + s, + except_(s, s), + ), + "SELECT foo, bar FROM bat " + "UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)" + ) + + # this solves it + self.assert_compile( + union( + except_(s, s).alias().select(), + s + ), + "SELECT anon_1.foo, anon_1.bar FROM " + "(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1 " + "UNION SELECT foo, bar FROM bat" + ) + + self.assert_compile( + except_( + union(s, s), + union(s, s) + ), + "(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " + "EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)" + ) + s2 = union(s, s) + s3 = union(s2, s2) + self.assert_compile(s3, "(SELECT foo, bar FROM bat " + "UNION SELECT foo, bar FROM bat) " + "UNION (SELECT foo, bar FROM bat " + "UNION SELECT foo, bar FROM bat)") + + + self.assert_compile( + union( + intersect(s, s), + intersect(s, s) + ), + "(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) " + "UNION (SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat)" + ) @testing.uses_deprecated() def test_binds(self): |
