summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-10-24 14:02:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-10-24 14:02:37 -0400
commitc859893cb8f22db3904ec1a6aa5c71d0925fb2e6 (patch)
tree1efac6064d4711c81e00c271b75de8a309930780
parentd3aab7885ab79cc2ae881710f216964c9ee6bcda (diff)
downloadsqlalchemy-c859893cb8f22db3904ec1a6aa5c71d0925fb2e6.tar.gz
- get 100% lint/pep8 happening for test_compiler; next we will begin
cutting up tests and removing old ones - move test_in() to test_operators - slice up migrated operator tests into TOT
-rw-r--r--lib/sqlalchemy/dialects/sybase/__init__.py7
-rw-r--r--test/sql/test_compiler.py1179
-rw-r--r--test/sql/test_generative.py59
-rw-r--r--test/sql/test_operators.py304
4 files changed, 925 insertions, 624 deletions
diff --git a/lib/sqlalchemy/dialects/sybase/__init__.py b/lib/sqlalchemy/dialects/sybase/__init__.py
index 7a91ca673..4502c8f66 100644
--- a/lib/sqlalchemy/dialects/sybase/__init__.py
+++ b/lib/sqlalchemy/dialects/sybase/__init__.py
@@ -6,15 +6,16 @@
from sqlalchemy.dialects.sybase import base, pysybase, pyodbc
+# default dialect
+base.dialect = pyodbc.dialect
from base import CHAR, VARCHAR, TIME, NCHAR, NVARCHAR,\
TEXT,DATE,DATETIME, FLOAT, NUMERIC,\
BIGINT,INT, INTEGER, SMALLINT, BINARY,\
VARBINARY,UNITEXT,UNICHAR,UNIVARCHAR,\
- IMAGE,BIT,MONEY,SMALLMONEY,TINYINT
+ IMAGE,BIT,MONEY,SMALLMONEY,TINYINT,\
+ dialect
-# default dialect
-base.dialect = pyodbc.dialect
__all__ = (
'CHAR', 'VARCHAR', 'TIME', 'NCHAR', 'NVARCHAR',
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index 04443a0ed..0febf190f 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -13,13 +13,21 @@ styling and coherent test organization.
from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
-import datetime, re, operator, decimal
-from sqlalchemy import *
+from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
+ func, not_, cast, text, tuple_, exists, delete, update, bindparam,\
+ insert, literal, and_, null, type_coerce, alias, or_, literal_column,\
+ Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\
+ intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\
+ over, subquery
+import datetime
+import operator
+import decimal
from sqlalchemy import exc, sql, util, types, schema
-from sqlalchemy.sql import table, column, label, compiler
+from sqlalchemy.sql import table, column, label
from sqlalchemy.sql.expression import ClauseList, _literal_as_text, HasPrefixes
from sqlalchemy.engine import default
-from sqlalchemy.databases import *
+from sqlalchemy.dialects import mysql, mssql, postgresql, oracle, \
+ sqlite, sybase
from sqlalchemy.ext.compiler import compiles
table1 = table('mytable',
@@ -48,7 +56,7 @@ table4 = Table(
Column('rem_id', Integer, primary_key=True),
Column('datatype_id', Integer),
Column('value', String(20)),
- schema = 'remote_owner'
+ schema='remote_owner'
)
# table with a 'multipart' schema
@@ -57,7 +65,7 @@ table5 = Table(
Column('rem_id', Integer, primary_key=True),
Column('datatype_id', Integer),
Column('value', String(20)),
- schema = 'dbo.remote_owner'
+ schema='dbo.remote_owner'
)
users = table('users',
@@ -101,15 +109,20 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'Scalar Select expression has no '
'columns; use this object directly within a '
'column-level expression.',
- lambda: hasattr(select([table1.c.myid]).as_scalar().self_group(), 'columns'))
+ lambda: hasattr(
+ select([table1.c.myid]).as_scalar().self_group(),
+ 'columns'))
assert_raises_message(
exc.InvalidRequestError,
'Scalar Select expression has no '
'columns; use this object directly within a '
'column-level expression.',
- lambda: hasattr(select([table1.c.myid]).as_scalar(), 'columns'))
+ lambda: hasattr(select([table1.c.myid]).as_scalar(),
+ 'columns'))
else:
- assert not hasattr(select([table1.c.myid]).as_scalar().self_group(), 'columns')
+ assert not hasattr(
+ select([table1.c.myid]).as_scalar().self_group(),
+ 'columns')
assert not hasattr(select([table1.c.myid]).as_scalar(), 'columns')
def test_prefix_constructor(self):
@@ -127,9 +140,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"mytable.description FROM mytable")
self.assert_compile(select([table1, table2]),
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable")
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable")
def test_invalid_col_argument(self):
assert_raises(exc.ArgumentError, select, table1)
@@ -165,11 +178,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select([1]).limit(lim).offset(offset),
"SELECT 1 " + exp,
- checkparams =params
+ checkparams=params
)
def test_from_subquery(self):
- """tests placing select statements in the column clause of another select, for the
+ """tests placing select statements in the column clause of
+ another select, for the
purposes of selecting from the exported columns of that select."""
s = select([table1], table1.c.name == 'jack')
@@ -177,17 +191,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
select(
[s],
s.c.myid == 7
- )
- ,
+ ),
"SELECT myid, name, description FROM (SELECT mytable.myid AS myid, "
- "mytable.name AS name, mytable.description AS description FROM mytable "
+ "mytable.name AS name, mytable.description AS description "
+ "FROM mytable "
"WHERE mytable.name = :name_1) WHERE myid = :myid_1")
sq = select([table1])
self.assert_compile(
sq.select(),
- "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, "
- "mytable.name AS name, mytable.description AS description FROM mytable)"
+ "SELECT myid, name, description FROM "
+ "(SELECT mytable.myid AS myid, "
+ "mytable.name AS name, mytable.description "
+ "AS description FROM mytable)"
)
sq = select(
@@ -198,13 +214,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
sq.select(sq.c.myid == 7),
"SELECT sq.myid, sq.name, sq.description FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name, "
- "mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :myid_1"
+ "mytable.description AS description FROM mytable) AS sq "
+ "WHERE sq.myid = :myid_1"
)
sq = select(
[table1, table2],
- and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid),
- use_labels = True
+ and_(table1.c.myid == 7, table2.c.otherid == table1.c.myid),
+ use_labels=True
).alias('sq')
sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS "\
@@ -222,14 +239,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
sq2 = select(
[sq],
- use_labels = True
+ use_labels=True
).alias('sq2')
self.assert_compile(
sq2.select(),
"SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, "
"sq2.sq_mytable_description, sq2.sq_myothertable_otherid, "
- "sq2.sq_myothertable_othername FROM (SELECT sq.mytable_myid AS "
+ "sq2.sq_myothertable_othername FROM "
+ "(SELECT sq.mytable_myid AS "
"sq_mytable_myid, sq.mytable_name AS sq_mytable_name, "
"sq.mytable_description AS sq_mytable_description, "
"sq.myothertable_otherid AS sq_myothertable_otherid, "
@@ -238,13 +256,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_select_from_clauselist(self):
self.assert_compile(
- select([ClauseList(column('a'), column('b'))]).select_from('sometable'),
+ select([ClauseList(column('a'), column('b'))]
+ ).select_from('sometable'),
'SELECT a, b FROM sometable'
)
def test_use_labels(self):
self.assert_compile(
- select([table1.c.myid==5], use_labels=True),
+ select([table1.c.myid == 5], use_labels=True),
"SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable"
)
@@ -255,7 +274,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select([not_(True)], use_labels=True),
- "SELECT NOT :param_1" # TODO: should this make an anon label ??
+ "SELECT NOT :param_1"
)
self.assert_compile(
@@ -264,7 +283,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
)
self.assert_compile(
- select([func.sum(func.lala(table1.c.myid).label('foo')).label('bar')]),
+ select([func.sum(
+ func.lala(table1.c.myid).label('foo')).label('bar')]),
"SELECT sum(lala(mytable.myid)) AS bar FROM mytable"
)
@@ -285,28 +305,28 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
stmt,
- "select ?, ?, ? from sometable"
- , dialect=default.DefaultDialect(paramstyle='qmark')
+ "select ?, ?, ? from sometable",
+ dialect=default.DefaultDialect(paramstyle='qmark')
)
self.assert_compile(
stmt,
- "select :foo, :bar, :bat from sometable"
- , dialect=default.DefaultDialect(paramstyle='named')
+ "select :foo, :bar, :bat from sometable",
+ dialect=default.DefaultDialect(paramstyle='named')
)
self.assert_compile(
stmt,
- "select %s, %s, %s from sometable"
- , dialect=default.DefaultDialect(paramstyle='format')
+ "select %s, %s, %s from sometable",
+ dialect=default.DefaultDialect(paramstyle='format')
)
self.assert_compile(
stmt,
- "select :1, :2, :3 from sometable"
- , dialect=default.DefaultDialect(paramstyle='numeric')
+ "select :1, :2, :3 from sometable",
+ dialect=default.DefaultDialect(paramstyle='numeric')
)
self.assert_compile(
stmt,
- "select %(foo)s, %(bar)s, %(bat)s from sometable"
- , dialect=default.DefaultDialect(paramstyle='pyformat')
+ "select %(foo)s, %(bar)s, %(bat)s from sometable",
+ dialect=default.DefaultDialect(paramstyle='pyformat')
)
def test_dupe_columns(self):
@@ -315,22 +335,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select([column('a'), column('a'), column('a')]),
- "SELECT a, a, a"
- , dialect=default.DefaultDialect()
+ "SELECT a, a, a", dialect=default.DefaultDialect()
)
c = column('a')
self.assert_compile(
select([c, c, c]),
- "SELECT a"
- , dialect=default.DefaultDialect()
+ "SELECT a", dialect=default.DefaultDialect()
)
a, b = column('a'), column('b')
self.assert_compile(
select([a, b, b, b, a, a]),
- "SELECT a, b"
- , dialect=default.DefaultDialect()
+ "SELECT a, b", dialect=default.DefaultDialect()
)
# using alternate keys.
@@ -339,20 +356,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
Column('c', Integer, key='a')
self.assert_compile(
select([a, b, c, a, b, c]),
- "SELECT a, b, c"
- , dialect=default.DefaultDialect()
+ "SELECT a, b, c", dialect=default.DefaultDialect()
)
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
- "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3"
- , dialect=default.DefaultDialect(paramstyle='named')
+ "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3",
+ dialect=default.DefaultDialect(paramstyle='named')
)
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
- "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3"
- , dialect=default.DefaultDialect(paramstyle='qmark'),
+ "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3",
+ dialect=default.DefaultDialect(paramstyle='qmark'),
)
self.assert_compile(
@@ -403,7 +419,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT anon_1.anon_2_x AS anon_1_anon_2_x, "
"anon_1.anon_2_y AS anon_1_anon_2_y, "
"anon_1.anon_2_z AS anon_1_anon_2_z "
- "FROM (SELECT anon_2.x AS anon_2_x, anon_2.y AS anon_2_y, "
+ "FROM (SELECT anon_2.x AS anon_2_x, "
+ "anon_2.y AS anon_2_y, "
"anon_2.z AS anon_2_z FROM "
"(SELECT keyed.x AS x, keyed.y AS y, keyed.z "
"AS z FROM keyed) AS anon_2) AS anon_1"
@@ -421,31 +438,34 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_full_correlate(self):
# intentional
t = table('t', column('a'), column('b'))
- s = select([t.c.a]).where(t.c.a==1).correlate(t).as_scalar()
+ s = select([t.c.a]).where(t.c.a == 1).correlate(t).as_scalar()
s2 = select([t.c.a, s])
- self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""")
+ self.assert_compile(s2,
+ "SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t")
# unintentional
t2 = table('t2', column('c'), column('d'))
- s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar()
- s2 =select([t, t2, s])
+ s = select([t.c.a]).where(t.c.a == t2.c.d).as_scalar()
+ s2 = select([t, t2, s])
assert_raises(exc.InvalidRequestError, str, s2)
# intentional again
s = s.correlate(t, t2)
- s2 =select([t, t2, s])
+ s2 = select([t, t2, s])
self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d")
def test_exists(self):
- s = select([table1.c.myid]).where(table1.c.myid==5)
+ s = select([table1.c.myid]).where(table1.c.myid == 5)
self.assert_compile(exists(s),
- "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
+ "EXISTS (SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = :myid_1)"
)
self.assert_compile(exists(s.as_scalar()),
- "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
+ "EXISTS (SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = :myid_1)"
)
self.assert_compile(exists([table1.c.myid], table1.c.myid
@@ -479,7 +499,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'EXISTS (SELECT * FROM myothertable WHERE '
'myothertable.otherid = mytable.myid)')
self.assert_compile(table1.select(exists().where(table2.c.otherid
- == table1.c.myid).correlate(table1)).replace_selectable(table2,
+ == table1.c.myid).correlate(table1)
+ ).replace_selectable(table2,
table2.alias()),
'SELECT mytable.myid, mytable.name, '
'mytable.description FROM mytable WHERE '
@@ -487,7 +508,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'myothertable_1 WHERE myothertable_1.otheri'
'd = mytable.myid)')
self.assert_compile(table1.select(exists().where(table2.c.otherid
- == table1.c.myid).correlate(table1)).select_from(table1.join(table2,
+ == table1.c.myid).correlate(table1)).select_from(
+ table1.join(table2,
table1.c.myid
== table2.c.otherid)).replace_selectable(table2,
table2.alias()),
@@ -502,8 +524,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select([
or_(
- exists().where(table2.c.otherid=='foo'),
- exists().where(table2.c.otherid=='bar')
+ exists().where(table2.c.otherid == 'foo'),
+ exists().where(table2.c.otherid == 'bar')
)
]),
"SELECT (EXISTS (SELECT * FROM myothertable "
@@ -581,7 +603,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'(SELECT myothertable.otherid FROM '
'myothertable WHERE mytable.myid = '
'myothertable.otherid)')
- self.assert_compile(table1.select(order_by=[desc(select([table2.c.otherid],
+ self.assert_compile(table1.select(order_by=[
+ desc(select([table2.c.otherid],
table1.c.myid == table2.c.otherid))]),
'SELECT mytable.myid, mytable.name, '
'mytable.description FROM mytable ORDER BY '
@@ -589,12 +612,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'myothertable WHERE mytable.myid = '
'myothertable.otherid) DESC')
- @testing.uses_deprecated('scalar option')
def test_scalar_select(self):
assert_raises_message(
exc.InvalidRequestError,
r"Select objects don't have a type\. Call as_scalar\(\) "
- "on this Select object to return a 'scalar' version of this Select\.",
+ "on this Select object to return a 'scalar' "
+ "version of this Select\.",
func.coalesce, select([table1.c.myid])
)
@@ -618,7 +641,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
s = select([table1.c.myid]).as_scalar()
s2 = s.where(table1.c.myid == 5)
self.assert_compile(
- s2, "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
+ s2,
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
)
self.assert_compile(
s, "(SELECT mytable.myid FROM mytable)"
@@ -688,12 +712,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
column('nm')
)
zip = '12345'
- qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar()
- qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar()
-
- q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
- zips.c.zipcode==zip,
- order_by = ['dist', places.c.nm]
+ qlat = select([zips.c.latitude], zips.c.zipcode == zip).\
+ correlate(None).as_scalar()
+ qlng = select([zips.c.longitude], zips.c.zipcode == zip).\
+ correlate(None).as_scalar()
+
+ q = select([places.c.id, places.c.nm, zips.c.zipcode,
+ func.latlondist(qlat, qlng).label('dist')],
+ zips.c.zipcode == zip,
+ order_by=['dist', places.c.nm]
)
self.assert_compile(q,
@@ -707,8 +734,10 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
':zipcode_3 ORDER BY dist, places.nm')
zalias = zips.alias('main_zip')
- qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
- qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
+ qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).\
+ as_scalar()
+ qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).\
+ as_scalar()
q = select([places.c.id, places.c.nm, zalias.c.zipcode,
func.latlondist(qlat, qlng).label('dist')],
order_by=['dist', places.c.nm])
@@ -723,8 +752,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'dist, places.nm')
a1 = table2.alias('t2alias')
- s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid).as_scalar()
- j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
+ s1 = select([a1.c.otherid], table1.c.myid == a1.c.otherid).as_scalar()
+ j1 = table1.join(table2, table1.c.myid == table2.c.otherid)
s2 = select([table1, s1], from_obj=j1)
self.assert_compile(s2,
'SELECT mytable.myid, mytable.name, '
@@ -743,7 +772,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
':param_1')
self.assert_compile(
- label('bar', column('foo', type_=String))+ 'foo',
+ label('bar', column('foo', type_=String)) + 'foo',
'foo || :param_1')
@@ -758,16 +787,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
)
self.assert_compile(
- and_(table1.c.myid == 12, table1.c.name=='asdf',
+ and_(table1.c.myid == 12, table1.c.name == 'asdf',
table2.c.othername == 'foo', "sysdate() = today()"),
"mytable.myid = :myid_1 AND mytable.name = :name_1 "\
- "AND myothertable.othername = :othername_1 AND sysdate() = today()"
+ "AND myothertable.othername = "
+ ":othername_1 AND sysdate() = today()"
)
self.assert_compile(
and_(
table1.c.myid == 12,
- or_(table2.c.othername=='asdf',
+ or_(table2.c.othername == 'asdf',
table2.c.othername == 'foo', table2.c.otherid == 9),
"sysdate() = today()",
),
@@ -775,7 +805,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
':othername_1 OR myothertable.othername = :othername_2 OR '
'myothertable.otherid = :otherid_1) AND sysdate() = '
'today()',
- checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
+ checkparams={'othername_1': 'asdf', 'othername_2': 'foo',
+ 'otherid_1': 9, 'myid_1': 12}
)
def test_nested_conjunctions_short_circuit(self):
@@ -785,18 +816,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
t = table('t', column('x'))
self.assert_compile(
- select([t]).where(and_(t.c.x==5,
- or_(and_(or_(t.c.x==7))))),
+ select([t]).where(and_(t.c.x == 5,
+ or_(and_(or_(t.c.x == 7))))),
"SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2"
)
self.assert_compile(
- select([t]).where(and_(or_(t.c.x==12,
- and_(or_(t.c.x==8))))),
+ select([t]).where(and_(or_(t.c.x == 12,
+ and_(or_(t.c.x == 8))))),
"SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
)
self.assert_compile(
- select([t]).where(and_(or_(or_(t.c.x==12),
- and_(or_(), or_(and_(t.c.x==8)), and_())))),
+ select([t]).where(and_(or_(or_(t.c.x == 12),
+ and_(or_(), or_(and_(t.c.x == 8)), and_())))),
"SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
)
@@ -858,7 +889,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
('a', table1.c.myid, ':myid_1', 'mytable.myid'),
- ('a', literal('b'), ':param_2', ':param_1'), # note swap!
+ ('a', literal('b'), ':param_2', ':param_1'), # note swap!
(table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
(table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
@@ -884,18 +915,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
(operator.neg, '-'),
(operator.inv, 'NOT '),
):
- for expr, sql in (
+ for expr, expected in (
(table1.c.myid, "mytable.myid"),
(literal("foo"), ":param_1"),
):
-
- compiled = str(py_op(expr))
- sql = "%s%s" % (op, sql)
- eq_(compiled, sql)
+ self.assert_compile(py_op(expr), "%s%s" % (op, expected))
def test_negate_operators_2(self):
self.assert_compile(
- table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
+ table1.select((table1.c.myid != 12) & ~(table1.c.name == 'john')),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1"
)
@@ -903,7 +931,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_negate_operators_3(self):
self.assert_compile(
table1.select((table1.c.myid != 12) &
- ~(table1.c.name.between('jack','john'))),
+ ~(table1.c.name.between('jack', 'john'))),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND "\
"NOT (mytable.name BETWEEN :name_1 AND :name_2)"
@@ -912,7 +940,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_negate_operators_4(self):
self.assert_compile(
table1.select((table1.c.myid != 12) &
- ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
+ ~and_(table1.c.name == 'john',
+ table1.c.name == 'ed',
+ table1.c.name == 'fred')),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE mytable.myid != :myid_1 AND "\
"NOT (mytable.name = :name_1 AND mytable.name = :name_2 "
@@ -928,7 +958,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_commutative_operators(self):
self.assert_compile(
- literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
+ literal("a") + literal("b") * literal("c"),
+ ":param_1 || :param_2 * :param_3"
)
def test_op_operators(self):
@@ -993,31 +1024,31 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
(
table1.c.myid.ilike('somstr', escape='\\'),
"mytable.myid ILIKE %(myid_1)s ESCAPE '\\\\'",
- postgresql.PGDialect()),
+ postgresql.dialect()),
(
~table1.c.myid.ilike('somstr', escape='\\'),
"mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\\\'",
- postgresql.PGDialect()),
+ postgresql.dialect()),
(
table1.c.name.ilike('%something%'),
"lower(mytable.name) LIKE lower(:name_1)", None),
(
table1.c.name.ilike('%something%'),
- "mytable.name ILIKE %(name_1)s", postgresql.PGDialect()),
+ "mytable.name ILIKE %(name_1)s", postgresql.dialect()),
(
~table1.c.name.ilike('%something%'),
"lower(mytable.name) NOT LIKE lower(:name_1)", None),
(
~table1.c.name.ilike('%something%'),
"mytable.name NOT ILIKE %(name_1)s",
- postgresql.PGDialect()),
+ postgresql.dialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_match(self):
for expr, check, dialect in [
(table1.c.myid.match('somstr'),
- "mytable.myid MATCH ?", sqlite.SQLiteDialect()),
+ "mytable.myid MATCH ?", sqlite.dialect()),
(table1.c.myid.match('somstr'),
"MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)",
mysql.dialect()),
@@ -1036,7 +1067,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_multiple_col_binds(self):
self.assert_compile(
- select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf',
+ select(["*"], or_(table1.c.myid == 12, table1.c.myid == 'asdf',
table1.c.myid == 'foo')),
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
"OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
@@ -1044,66 +1075,86 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_order_by_nulls(self):
self.assert_compile(
- table2.select(order_by = [table2.c.otherid, table2.c.othername.desc().nullsfirst()]),
+ table2.select(order_by=[table2.c.otherid,
+ table2.c.othername.desc().nullsfirst()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC NULLS FIRST"
+ "myothertable ORDER BY myothertable.otherid, "
+ "myothertable.othername DESC NULLS FIRST"
)
self.assert_compile(
- table2.select(order_by = [table2.c.otherid, table2.c.othername.desc().nullslast()]),
+ table2.select(order_by=[
+ table2.c.otherid, table2.c.othername.desc().nullslast()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC NULLS LAST"
+ "myothertable ORDER BY myothertable.otherid, "
+ "myothertable.othername DESC NULLS LAST"
)
self.assert_compile(
- table2.select(order_by = [table2.c.otherid.nullslast(), table2.c.othername.desc().nullsfirst()]),
+ table2.select(order_by=[
+ table2.c.otherid.nullslast(),
+ table2.c.othername.desc().nullsfirst()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid NULLS LAST, myothertable.othername DESC NULLS FIRST"
+ "myothertable ORDER BY myothertable.otherid NULLS LAST, "
+ "myothertable.othername DESC NULLS FIRST"
)
self.assert_compile(
- table2.select(order_by = [table2.c.otherid.nullsfirst(), table2.c.othername.desc()]),
+ table2.select(order_by=[table2.c.otherid.nullsfirst(),
+ table2.c.othername.desc()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid NULLS FIRST, myothertable.othername DESC"
+ "myothertable ORDER BY myothertable.otherid NULLS FIRST, "
+ "myothertable.othername DESC"
)
self.assert_compile(
- table2.select(order_by = [table2.c.otherid.nullsfirst(), table2.c.othername.desc().nullslast()]),
+ table2.select(order_by=[table2.c.otherid.nullsfirst(),
+ table2.c.othername.desc().nullslast()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid NULLS FIRST, myothertable.othername DESC NULLS LAST"
+ "myothertable ORDER BY myothertable.otherid NULLS FIRST, "
+ "myothertable.othername DESC NULLS LAST"
)
def test_orderby_groupby(self):
self.assert_compile(
- table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
+ table2.select(order_by=[table2.c.otherid,
+ asc(table2.c.othername)]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
+ "myothertable ORDER BY myothertable.otherid, "
+ "myothertable.othername ASC"
)
self.assert_compile(
- table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
+ table2.select(order_by=[table2.c.otherid,
+ table2.c.othername.desc()]),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
+ "myothertable ORDER BY myothertable.otherid, "
+ "myothertable.othername DESC"
)
# generative order_by
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
+ table2.select().order_by(table2.c.otherid).\
+ order_by(table2.c.othername.desc()),
"SELECT myothertable.otherid, myothertable.othername FROM "
- "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
+ "myothertable ORDER BY myothertable.otherid, "
+ "myothertable.othername DESC"
)
self.assert_compile(
table2.select().order_by(table2.c.otherid).
- order_by(table2.c.othername.desc()).order_by(None),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable"
+ order_by(table2.c.othername.desc()
+ ).order_by(None),
+ "SELECT myothertable.otherid, myothertable.othername "
+ "FROM myothertable"
)
self.assert_compile(
select(
[table2.c.othername, func.count(table2.c.otherid)],
- group_by = [table2.c.othername]),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ group_by=[table2.c.othername]),
+ "SELECT myothertable.othername, "
+ "count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername"
)
@@ -1111,58 +1162,62 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select([table2.c.othername, func.count(table2.c.otherid)]).
group_by(table2.c.othername),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "SELECT myothertable.othername, "
+ "count(myothertable.otherid) AS count_1 "
"FROM myothertable GROUP BY myothertable.othername"
)
self.assert_compile(
select([table2.c.othername, func.count(table2.c.otherid)]).
group_by(table2.c.othername).group_by(None),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "SELECT myothertable.othername, "
+ "count(myothertable.otherid) AS count_1 "
"FROM myothertable"
)
self.assert_compile(
select([table2.c.othername, func.count(table2.c.otherid)],
- group_by = [table2.c.othername],
- order_by = [table2.c.othername]),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
- "FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
+ group_by=[table2.c.othername],
+ order_by=[table2.c.othername]),
+ "SELECT myothertable.othername, "
+ "count(myothertable.otherid) AS count_1 "
+ "FROM myothertable "
+ "GROUP BY myothertable.othername ORDER BY myothertable.othername"
)
def test_for_update(self):
self.assert_compile(
- table1.select(table1.c.myid==7, for_update=True),
+ table1.select(table1.c.myid == 7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
self.assert_compile(
- table1.select(table1.c.myid==7, for_update=False),
+ table1.select(table1.c.myid == 7, for_update=False),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1")
# not supported by dialect, should just use update
self.assert_compile(
- table1.select(table1.c.myid==7, for_update='nowait'),
+ table1.select(table1.c.myid == 7, for_update='nowait'),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
# unknown lock mode
self.assert_compile(
- table1.select(table1.c.myid==7, for_update='unknown_mode'),
+ table1.select(table1.c.myid == 7, for_update='unknown_mode'),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
# ----- mysql
self.assert_compile(
- table1.select(table1.c.myid==7, for_update=True),
+ table1.select(table1.c.myid == 7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s FOR UPDATE",
dialect=mysql.dialect())
self.assert_compile(
- table1.select(table1.c.myid==7, for_update="read"),
+ table1.select(table1.c.myid == 7, for_update="read"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
dialect=mysql.dialect())
@@ -1170,13 +1225,13 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# ----- oracle
self.assert_compile(
- table1.select(table1.c.myid==7, for_update=True),
+ table1.select(table1.c.myid == 7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
dialect=oracle.dialect())
self.assert_compile(
- table1.select(table1.c.myid==7, for_update="nowait"),
+ table1.select(table1.c.myid == 7, for_update="nowait"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
dialect=oracle.dialect())
@@ -1184,40 +1239,41 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# ----- postgresql
self.assert_compile(
- table1.select(table1.c.myid==7, for_update=True),
+ table1.select(table1.c.myid == 7, for_update=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE",
dialect=postgresql.dialect())
self.assert_compile(
- table1.select(table1.c.myid==7, for_update="nowait"),
+ table1.select(table1.c.myid == 7, for_update="nowait"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT",
dialect=postgresql.dialect())
self.assert_compile(
- table1.select(table1.c.myid==7, for_update="read"),
+ table1.select(table1.c.myid == 7, for_update="read"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE",
dialect=postgresql.dialect())
self.assert_compile(
- table1.select(table1.c.myid==7, for_update="read_nowait"),
+ table1.select(table1.c.myid == 7, for_update="read_nowait"),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT",
dialect=postgresql.dialect())
def test_alias(self):
- # test the alias for a table1. column names stay the same, table name "changes" to "foo".
+ # test the alias for a table1. column names stay the same,
+ # table name "changes" to "foo".
self.assert_compile(
- select([table1.alias('foo')])
- ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
+ select([table1.alias('foo')]),
+ "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
for dialect in (oracle.dialect(),):
self.assert_compile(
- select([table1.alias('foo')])
- ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
- ,dialect=dialect)
+ select([table1.alias('foo')]),
+ "SELECT foo.myid, foo.name, foo.description FROM mytable foo",
+ dialect=dialect)
self.assert_compile(
select([table1.alias()]),
@@ -1231,24 +1287,31 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# from the first table1.
q = select(
[table1, table2.c.otherid],
- table1.c.myid == table2.c.otherid, use_labels = True
+ table1.c.myid == table2.c.otherid, use_labels=True
)
# make an alias of the "selectable". column names
# stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
- # select from that alias, also using labels. two levels of labels should produce two underscores.
+ # select from that alias, also using labels. two levels of labels
+ # should produce two underscores.
# also, reference the column "mytable_myid" off of the t2view alias.
self.assert_compile(
- a.select(a.c.mytable_myid == 9, use_labels = True),
- "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name "
- "AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, "
+ a.select(a.c.mytable_myid == 9, use_labels=True),
+ "SELECT t2view.mytable_myid AS t2view_mytable_myid, "
+ "t2view.mytable_name "
+ "AS t2view_mytable_name, "
+ "t2view.mytable_description AS t2view_mytable_description, "
"t2view.myothertable_otherid AS t2view_myothertable_otherid FROM "
- "(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, "
- "mytable.description AS mytable_description, myothertable.otherid AS "
- "myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = "
- "myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
+ "(SELECT mytable.myid AS mytable_myid, "
+ "mytable.name AS mytable_name, "
+ "mytable.description AS mytable_description, "
+ "myothertable.otherid AS "
+ "myothertable_otherid FROM mytable, myothertable "
+ "WHERE mytable.myid = "
+ "myothertable.otherid) AS t2view "
+ "WHERE t2view.mytable_myid = :mytable_myid_1"
)
@@ -1262,7 +1325,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_prefix_dialect_specific(self):
self.assert_compile(
- table1.select().prefix_with("SQL_CALC_FOUND_ROWS", dialect='sqlite').\
+ table1.select().prefix_with("SQL_CALC_FOUND_ROWS",
+ dialect='sqlite').\
prefix_with("SQL_SOME_WEIRD_MYSQL_THING",
dialect='mysql'),
"SELECT SQL_SOME_WEIRD_MYSQL_THING "
@@ -1272,7 +1336,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_text(self):
self.assert_compile(
- text("select * from foo where lala = bar") ,
+ text("select * from foo where lala = bar"),
"select * from foo where lala = bar"
)
@@ -1280,7 +1344,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(select(
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
- from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
+ from_obj=["foobar left outer join lala on foobar.foo = lala.foo"]
),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
"left outer join lala on foobar.foo = lala.foo WHERE a = 12"
@@ -1290,7 +1354,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(select(
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
- from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
+ from_obj=[u"foobar left outer join lala on foobar.foo = lala.foo"]
),
"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
"left outer join lala on foobar.foo = lala.foo WHERE a = 12"
@@ -1308,34 +1372,42 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"column1=12 AND column2=19 ORDER BY column1")
self.assert_compile(
- select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
+ select(["column1", "column2"],
+ from_obj=table1).alias('somealias').select(),
"SELECT somealias.column1, somealias.column2 FROM "
"(SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
self.assert_compile(
- select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True),
- "SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable"
+ select(["column1", "column2", table1.c.myid], from_obj=table1,
+ use_labels=True),
+ "SELECT column1, column2, mytable.myid AS mytable_myid "
+ "FROM mytable"
)
- # test that use_labels doesnt interfere with literal columns that have textual labels
+ # test that use_labels doesnt interfere
+ # with literal columns that have textual labels
self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True),
- "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
+ select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
+ from_obj=table1, use_labels=True),
+ "SELECT column1 AS foobar, column2 AS hoho, "
+ "mytable.myid AS mytable_myid FROM mytable"
)
- s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
- # test that "auto-labeling of subquery columns" doesnt interfere with literal columns,
+ # test that "auto-labeling of subquery columns"
+ # doesnt interfere with literal columns,
# exported columns dont get quoted
self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(),
+ select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
+ from_obj=[table1]).select(),
"SELECT column1 AS foobar, column2 AS hoho, myid FROM "
- "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
+ "(SELECT column1 AS foobar, column2 AS hoho, "
+ "mytable.myid AS myid FROM mytable)"
)
self.assert_compile(
- select(['col1','col2'], from_obj='tablename').alias('myalias'),
+ select(['col1', 'col2'], from_obj='tablename').alias('myalias'),
"SELECT col1, col2 FROM tablename"
)
@@ -1344,7 +1416,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
text("select * from foo where lala=:bar and hoho=:whee",
bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=:bar and hoho=:whee",
- checkparams={'bar':4, 'whee': 7},
+ checkparams={'bar': 4, 'whee': 7},
)
self.assert_compile(
@@ -1357,15 +1429,16 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = postgresql.dialect()
self.assert_compile(
text("select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
- checkparams={'bar':4, 'whee': 7},
+ checkparams={'bar': 4, 'whee': 7},
dialect=dialect
)
# test escaping out text() params with a backslash
self.assert_compile(
- text("select * from foo where clock='05:06:07' and mork='\:mindy'"),
+ text("select * from foo where clock='05:06:07' "
+ "and mork='\:mindy'"),
"select * from foo where clock='05:06:07' and mork=':mindy'",
checkparams={},
params={},
@@ -1375,9 +1448,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = sqlite.dialect()
self.assert_compile(
text("select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=? and hoho=?",
- checkparams={'bar':4, 'whee':7},
+ checkparams={'bar': 4, 'whee': 7},
dialect=dialect
)
@@ -1397,7 +1470,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(select(
[alias(table1, 't'), "foo.f"],
"foo.f = t.id",
- from_obj = ["(select f from bar where lala=heyhey) foo"]
+ from_obj=["(select f from bar where lala=heyhey) foo"]
),
"SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
"(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
@@ -1410,7 +1483,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
)
s = select([
- (func.current_date() + literal_column("s.a")).label("dates")
+ (func.current_date() +
+ literal_column("s.a")).label("dates")
]).select_from(generate_series)
self.assert_compile(
s,
@@ -1474,59 +1548,74 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_literal(self):
- self.assert_compile(select([literal('foo')]), "SELECT :param_1 AS anon_1")
+ self.assert_compile(select([literal('foo')]),
+ "SELECT :param_1 AS anon_1")
- self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
+ self.assert_compile(select([literal("foo") + literal("bar")],
+ from_obj=[table1]),
"SELECT :param_1 || :param_2 AS anon_1 FROM mytable")
def test_calculated_columns(self):
- value_tbl = table('values',
+ value_tbl = table('values',
column('id', Integer),
column('val1', Float),
column('val2', Float),
)
- self.assert_compile(
+ self.assert_compile(
select([value_tbl.c.id, (value_tbl.c.val2 -
- value_tbl.c.val1)/value_tbl.c.val1]),
- "SELECT values.id, (values.val2 - values.val1) / values.val1 AS anon_1 FROM values"
+ value_tbl.c.val1) / value_tbl.c.val1]),
+ "SELECT values.id, (values.val2 - values.val1) "
+ "/ values.val1 AS anon_1 FROM values"
)
- self.assert_compile(
+ self.assert_compile(
select([value_tbl.c.id], (value_tbl.c.val2 -
- value_tbl.c.val1)/value_tbl.c.val1 > 2.0),
- "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1"
+ value_tbl.c.val1) / value_tbl.c.val1 > 2.0),
+ "SELECT values.id FROM values WHERE "
+ "(values.val2 - values.val1) / values.val1 > :param_1"
)
- self.assert_compile(
- select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
- "SELECT values.id FROM values WHERE (values.val1 / (values.val2 - values.val1)) / values.val1 > :param_1"
+ self.assert_compile(
+ select([value_tbl.c.id], value_tbl.c.val1 /
+ (value_tbl.c.val2 - value_tbl.c.val1) /
+ value_tbl.c.val1 > 2.0),
+ "SELECT values.id FROM values WHERE "
+ "(values.val1 / (values.val2 - values.val1)) "
+ "/ values.val1 > :param_1"
)
def test_collate(self):
for expr in (select([table1.c.name.collate('latin1_german2_ci')]),
select([collate(table1.c.name, 'latin1_german2_ci')])):
self.assert_compile(
- expr, "SELECT mytable.name COLLATE latin1_german2_ci AS anon_1 FROM mytable")
+ expr, "SELECT mytable.name COLLATE latin1_german2_ci "
+ "AS anon_1 FROM mytable")
- assert table1.c.name.collate('latin1_german2_ci').type is table1.c.name.type
+ assert table1.c.name.collate('latin1_german2_ci').type is \
+ table1.c.name.type
- expr = select([table1.c.name.collate('latin1_german2_ci').label('k1')]).order_by('k1')
- self.assert_compile(expr,"SELECT mytable.name COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
+ expr = select([table1.c.name.collate('latin1_german2_ci').\
+ label('k1')]).order_by('k1')
+ self.assert_compile(expr,
+ "SELECT mytable.name "
+ "COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
expr = select([collate('foo', 'latin1_german2_ci').label('k1')])
- self.assert_compile(expr,"SELECT :param_1 COLLATE latin1_german2_ci AS k1")
+ self.assert_compile(expr,
+ "SELECT :param_1 COLLATE latin1_german2_ci AS k1")
expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')])
self.assert_compile(expr,
"SELECT mytable.name COLLATE latin1_german2_ci "
"LIKE :param_1 AS anon_1 FROM mytable")
- expr = select([table1.c.name.like(collate('%x%', 'latin1_german2_ci'))])
+ expr = select([table1.c.name.like(collate('%x%',
+ 'latin1_german2_ci'))])
self.assert_compile(expr,
- "SELECT mytable.name "
- "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 "
- "FROM mytable")
+ "SELECT mytable.name "
+ "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 "
+ "FROM mytable")
expr = select([table1.c.name.collate('col1').like(
collate('%x%', 'col2'))])
@@ -1535,7 +1624,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"LIKE :param_1 COLLATE col2 AS anon_1 "
"FROM mytable")
- expr = select([func.concat('a', 'b').collate('latin1_german2_ci').label('x')])
+ expr = select([func.concat('a', 'b').\
+ collate('latin1_german2_ci').label('x')])
self.assert_compile(expr,
"SELECT concat(:param_1, :param_2) "
"COLLATE latin1_german2_ci AS x")
@@ -1556,8 +1646,10 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
t.select(use_labels=True),
'''SELECT "table%name"."percent%" AS "table%name_percent%", '''\
- '''"table%name"."%(oneofthese)s" AS "table%name_%(oneofthese)s", '''\
- '''"table%name"."spaces % more spaces" AS "table%name_spaces % '''\
+ '''"table%name"."%(oneofthese)s" AS '''\
+ '''"table%name_%(oneofthese)s", '''\
+ '''"table%name"."spaces % more spaces" AS '''\
+ '''"table%name_spaces % '''\
'''more spaces" FROM "table%name"'''
)
@@ -1573,7 +1665,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select(
[table1],
- from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid)]
+ from_obj=[join(table1, table2, table1.c.myid
+ == table2.c.otherid)]
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
@@ -1584,41 +1677,55 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
table3, table1.c.myid == table3.c.userid)]
),
"SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername, thirdtable.userid, "
- "thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid ="
- " myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
+ "myothertable.otherid, myothertable.othername, "
+ "thirdtable.userid, "
+ "thirdtable.otherstuff FROM mytable JOIN myothertable "
+ "ON mytable.myid ="
+ " myothertable.otherid JOIN thirdtable ON "
+ "mytable.myid = thirdtable.userid"
)
self.assert_compile(
- join(users, addresses, users.c.user_id==addresses.c.user_id).select(),
+ join(users, addresses, users.c.user_id ==
+ addresses.c.user_id).select(),
"SELECT users.user_id, users.user_name, users.password, "
"addresses.address_id, addresses.user_id, addresses.street, "
- "addresses.city, addresses.state, addresses.zip FROM users JOIN addresses "
+ "addresses.city, addresses.state, addresses.zip "
+ "FROM users JOIN addresses "
"ON users.user_id = addresses.user_id"
)
self.assert_compile(
select([table1, table2, table3],
- from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).
- outerjoin(table3, table1.c.myid==table3.c.userid)]
- )
- ,"SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername, thirdtable.userid,"
- " thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid "
- "= myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid ="
+ from_obj=[join(table1, table2,
+ table1.c.myid == table2.c.otherid).
+ outerjoin(table3,
+ table1.c.myid == table3.c.userid)]
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername, "
+ "thirdtable.userid,"
+ " thirdtable.otherstuff FROM mytable "
+ "JOIN myothertable ON mytable.myid "
+ "= myothertable.otherid LEFT OUTER JOIN thirdtable "
+ "ON mytable.myid ="
" thirdtable.userid"
)
self.assert_compile(
select([table1, table2, table3],
- from_obj = [outerjoin(table1,
- join(table2, table3, table2.c.otherid == table3.c.userid),
- table1.c.myid==table2.c.otherid)]
- )
- ,"SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername, thirdtable.userid,"
- " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable "
- "JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON "
+ from_obj=[outerjoin(table1,
+ join(table2, table3, table2.c.otherid
+ == table3.c.userid),
+ table1.c.myid == table2.c.otherid)]
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername, "
+ "thirdtable.userid,"
+ " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN "
+ "(myothertable "
+ "JOIN thirdtable ON myothertable.otherid = "
+ "thirdtable.userid) ON "
"mytable.myid = myothertable.otherid"
)
@@ -1630,7 +1737,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
table2.c.othername != 'jack',
"EXISTS (select yay from foo where boo = lar)"
),
- from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ]
+ from_obj=[outerjoin(table1, table2,
+ table1.c.myid == table2.c.otherid)]
)
self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, "
@@ -1653,34 +1761,40 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
x = union(
select([table1], table1.c.myid == 5),
select([table1], table1.c.myid == 12),
- order_by = [table1.c.myid],
+ order_by=[table1.c.myid],
)
- self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description "\
- "FROM mytable WHERE mytable.myid = :myid_1 UNION "\
- "SELECT mytable.myid, mytable.name, mytable.description "\
- "FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid")
+ self.assert_compile(x,
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description "
+ "FROM mytable WHERE "
+ "mytable.myid = :myid_1 UNION "
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_2 "
+ "ORDER BY mytable.myid")
x = union(
select([table1]),
select([table1])
)
x = union(x, select([table1]))
- self.assert_compile(x, "(SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable UNION SELECT mytable.myid, mytable.name, "
- "mytable.description FROM mytable) UNION SELECT mytable.myid,"
- " mytable.name, mytable.description FROM mytable")
+ self.assert_compile(x,
+ "(SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable UNION SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable) UNION SELECT mytable.myid,"
+ " mytable.name, mytable.description FROM mytable")
u1 = union(
select([table1.c.myid, table1.c.name]),
select([table2]),
select([table3])
)
- self.assert_compile(u1, "SELECT mytable.myid, mytable.name "
- "FROM mytable UNION SELECT myothertable.otherid, "
- "myothertable.othername FROM myothertable "
- "UNION SELECT thirdtable.userid, thirdtable.otherstuff "
- "FROM thirdtable")
+ self.assert_compile(u1,
+ "SELECT mytable.myid, mytable.name "
+ "FROM mytable UNION SELECT myothertable.otherid, "
+ "myothertable.othername FROM myothertable "
+ "UNION SELECT thirdtable.userid, thirdtable.otherstuff "
+ "FROM thirdtable")
assert u1.corresponding_column(table2.c.otherid) is u1.c.myid
@@ -1693,21 +1807,26 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
limit=5
),
"SELECT mytable.myid, mytable.name "
- "FROM mytable UNION SELECT myothertable.otherid, myothertable.othername "
+ "FROM mytable UNION SELECT myothertable.otherid, "
+ "myothertable.othername "
"FROM myothertable ORDER BY myid LIMIT :param_1 OFFSET :param_2",
- {'param_1':5, 'param_2':10}
+ {'param_1': 5, 'param_2': 10}
)
self.assert_compile(
union(
- select([table1.c.myid, table1.c.name, func.max(table1.c.description)],
- table1.c.name=='name2',
+ select([table1.c.myid, table1.c.name,
+ func.max(table1.c.description)],
+ table1.c.name == 'name2',
group_by=[table1.c.myid, table1.c.name]),
- table1.select(table1.c.name=='name1')
+ table1.select(table1.c.name == 'name1')
),
- "SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 "
- "FROM mytable WHERE mytable.name = :name_1 GROUP BY mytable.myid, "
- "mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description "
+ "SELECT mytable.myid, mytable.name, "
+ "max(mytable.description) AS max_1 "
+ "FROM mytable WHERE mytable.name = :name_1 "
+ "GROUP BY mytable.myid, "
+ "mytable.name UNION SELECT mytable.myid, mytable.name, "
+ "mytable.description "
"FROM mytable WHERE mytable.name = :name_2"
)
@@ -1726,8 +1845,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
select([table2.c.otherid]),
select([table3.c.userid]),
)
- )
- ,
+ ),
+
"SELECT mytable.myid FROM mytable UNION ALL "
"(SELECT myothertable.otherid FROM myothertable UNION "
"SELECT thirdtable.userid FROM thirdtable)"
@@ -1736,15 +1855,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
s = select([column('foo'), column('bar')])
- # ORDER BY's even though not supported by all DB's, are rendered if requested
+ # ORDER BY's even though not supported by
+ # all DB's, are rendered if requested
self.assert_compile(union(s.order_by("foo"), s.order_by("bar")),
"SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar"
)
# self_group() is honored
self.assert_compile(
- union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()),
- "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT :param_1)",
- {'param_1':10}
+ union(s.order_by("foo").self_group(),
+ s.order_by("bar").limit(10).self_group()),
+ "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, "
+ "bar ORDER BY bar LIMIT :param_1)",
+ {'param_1': 10}
)
@@ -1760,30 +1882,35 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
union(s, s, s, s),
"SELECT foo, bar FROM bat UNION SELECT foo, bar "
- "FROM bat UNION SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat"
+ "FROM bat UNION SELECT foo, bar FROM bat "
+ "UNION SELECT foo, bar FROM bat"
)
self.assert_compile(
union(s, union(s, union(s, s))),
"SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat "
- "UNION (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat))"
+ "UNION (SELECT foo, bar FROM bat "
+ "UNION SELECT foo, bar FROM bat))"
)
self.assert_compile(
select([s.alias()]),
- 'SELECT anon_1.foo, anon_1.bar FROM (SELECT foo, bar FROM bat) AS anon_1'
+ 'SELECT anon_1.foo, anon_1.bar FROM '
+ '(SELECT foo, bar FROM bat) AS anon_1'
)
self.assert_compile(
select([union(s, s).alias()]),
'SELECT anon_1.foo, anon_1.bar FROM '
- '(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) AS anon_1'
+ '(SELECT foo, bar FROM bat UNION '
+ 'SELECT foo, bar FROM bat) AS anon_1'
)
self.assert_compile(
select([except_(s, s).alias()]),
'SELECT anon_1.foo, anon_1.bar FROM '
- '(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1'
+ '(SELECT foo, bar FROM bat EXCEPT '
+ 'SELECT foo, bar FROM bat) AS anon_1'
)
# this query sqlite specifically chokes on
@@ -1812,7 +1939,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
s
),
"SELECT anon_1.foo, anon_1.bar FROM "
- "(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1 "
+ "(SELECT foo, bar FROM bat EXCEPT "
+ "SELECT foo, bar FROM bat) AS anon_1 "
"UNION SELECT foo, bar FROM bat"
)
@@ -1838,7 +1966,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
intersect(s, s)
),
"(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) "
- "UNION (SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat)"
+ "UNION (SELECT foo, bar FROM bat INTERSECT "
+ "SELECT foo, bar FROM bat)"
)
def test_binds(self):
@@ -1871,33 +2000,35 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
{'mytablename':5}, {'mytablename':5}, [5]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid'),
- table2.c.otherid==bindparam('myid'))),
+ select([table1], or_(table1.c.myid == bindparam('myid'),
+ table2.c.otherid == bindparam('myid'))),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = :myid "
"OR myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
- {'myid':None}, [None, None],
- {'myid':5}, {'myid':5}, [5,5]
+ {'myid': None}, [None, None],
+ {'myid': 5}, {'myid': 5}, [5, 5]
),
(
text("SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid"),
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid"),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid",
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? OR "
- "myothertable.otherid = ?",
+ "mytable, myothertable WHERE mytable.myid = ? OR "
+ "myothertable.otherid = ?",
{'myid':None}, [None, None],
- {'myid':5}, {'myid':5}, [5,5]
+ {'myid': 5}, {'myid': 5}, [5, 5]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid', unique=True),
- table2.c.otherid==bindparam('myid', unique=True))),
+ select([table1], or_(table1.c.myid ==
+ bindparam('myid', unique=True),
+ table2.c.otherid ==
+ bindparam('myid', unique=True))),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
@@ -1905,7 +2036,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
{'myid_1':None, 'myid_2':None}, [None, None],
- {'myid_1':5, 'myid_2': 6}, {'myid_1':5, 'myid_2':6}, [5,6]
+ {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
),
(
bindparam('test', type_=String, required=False) + text("'hi'"),
@@ -1917,8 +2048,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
(
# testing select.params() here - bindparam() objects
# must get required flag set to False
- select([table1], or_(table1.c.myid==bindparam('myid'),
- table2.c.otherid==bindparam('myotherid'))).\
+ select([table1], or_(table1.c.myid == bindparam('myid'),
+ table2.c.otherid == bindparam('myotherid'))).\
params({'myid':8, 'myotherid':7}),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
@@ -1926,47 +2057,56 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
- {'myid':8, 'myotherid':7}, [8, 7],
- {'myid':5}, {'myid':5, 'myotherid':7}, [5,7]
+ {'myid': 8, 'myotherid': 7}, [8, 7],
+ {'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7]
),
(
- select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True),
- table2.c.otherid==bindparam('myid', value=8, unique=True))),
+ select([table1], or_(table1.c.myid ==
+ bindparam('myid', value=7, unique=True),
+ table2.c.otherid ==
+ bindparam('myid', value=8, unique=True))),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
- {'myid_1':7, 'myid_2':8}, [7,8],
- {'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6]
+ {'myid_1': 7, 'myid_2': 8}, [7, 8],
+ {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
),
]:
- self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict)
- self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
+ self.assert_compile(stmt, expected_named_stmt,
+ params=expected_default_params_dict)
+ self.assert_compile(stmt, expected_positional_stmt,
+ dialect=sqlite.dialect())
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
pp = positional.params
- assert [pp[k] for k in positional.positiontup] == expected_default_params_list
- assert nonpositional.construct_params(test_param_dict) == expected_test_params_dict, \
- "expected :%s got %s" % (str(expected_test_params_dict), \
- str(nonpositional.get_params(**test_param_dict)))
+ eq_([pp[k] for k in positional.positiontup],
+ expected_default_params_list)
+
+ eq_(nonpositional.construct_params(test_param_dict),
+ expected_test_params_dict)
pp = positional.construct_params(test_param_dict)
- assert [pp[k] for k in positional.positiontup] == expected_test_params_list
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_test_params_list
+ )
# check that params() doesnt modify original statement
- s = select([table1], or_(table1.c.myid==bindparam('myid'),
- table2.c.otherid==bindparam('myotherid')))
- s2 = s.params({'myid':8, 'myotherid':7})
- s3 = s2.params({'myid':9})
- assert s.compile().params == {'myid':None, 'myotherid':None}
- assert s2.compile().params == {'myid':8, 'myotherid':7}
- assert s3.compile().params == {'myid':9, 'myotherid':7}
+ s = select([table1], or_(table1.c.myid == bindparam('myid'),
+ table2.c.otherid ==
+ bindparam('myotherid')))
+ s2 = s.params({'myid': 8, 'myotherid': 7})
+ s3 = s2.params({'myid': 9})
+ assert s.compile().params == {'myid': None, 'myotherid': None}
+ assert s2.compile().params == {'myid': 8, 'myotherid': 7}
+ assert s3.compile().params == {'myid': 9, 'myotherid': 7}
# test using same 'unique' param object twice in one compile
- s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar()
- s2 = select([table1, s], table1.c.myid==s)
+ s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar()
+ s2 = select([table1, s], table1.c.myid == s)
self.assert_compile(s2,
"SELECT mytable.myid, mytable.name, mytable.description, "
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
@@ -1978,29 +2118,29 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
- s = select([table1], or_(table1.c.myid==7,
- table1.c.myid==bindparam('myid_1')))
+ s = select([table1], or_(table1.c.myid == 7,
+ table1.c.myid == bindparam('myid_1')))
assert_raises_message(exc.CompileError,
"conflicts with unique bind parameter "
"of the same name",
str, s)
- s = select([table1], or_(table1.c.myid==7, table1.c.myid==8,
- table1.c.myid==bindparam('myid_1')))
+ s = select([table1], or_(table1.c.myid == 7, table1.c.myid == 8,
+ table1.c.myid == bindparam('myid_1')))
assert_raises_message(exc.CompileError,
"conflicts with unique bind parameter "
"of the same name",
str, s)
- def test_binds_no_hash_collision(self):
- """test that construct_params doesn't corrupt dict due to hash collisions"""
+ def _test_binds_no_hash_collision(self):
+ """test that construct_params doesn't corrupt dict
+ due to hash collisions"""
total_params = 100000
in_clause = [':in%d' % i for i in range(total_params)]
params = dict(('in%d' % i, i) for i in range(total_params))
- sql = 'text clause %s' % ', '.join(in_clause)
- t = text(sql)
+ t = text('text clause %s' % ', '.join(in_clause))
eq_(len(t.bindparams), total_params)
c = t.compile()
pp = c.construct_params(params)
@@ -2029,8 +2169,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
r"A value is required for bind parameter 'x'",
select([table1]).where(
and_(
- table1.c.myid==bindparam("x", required=True),
- table1.c.name==bindparam("y", required=True)
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True)
)
).compile().construct_params,
params=dict(y=5)
@@ -2039,7 +2179,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises_message(exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
select([table1]).where(
- table1.c.myid==bindparam("x", required=True)
+ table1.c.myid == bindparam("x", required=True)
).compile().construct_params
)
@@ -2048,8 +2188,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"in parameter group 2",
select([table1]).where(
and_(
- table1.c.myid==bindparam("x", required=True),
- table1.c.name==bindparam("y", required=True)
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True)
)
).compile().construct_params,
params=dict(y=5),
@@ -2060,142 +2200,28 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
r"A value is required for bind parameter 'x', "
"in parameter group 2",
select([table1]).where(
- table1.c.myid==bindparam("x", required=True)
+ table1.c.myid == bindparam("x", required=True)
).compile().construct_params,
_group_number=2
)
- @testing.emits_warning('.*empty sequence.*')
- def test_in(self):
- self.assert_compile(table1.c.myid.in_(['a']),
- "mytable.myid IN (:myid_1)")
-
- self.assert_compile(~table1.c.myid.in_(['a']),
- "mytable.myid NOT IN (:myid_1)")
-
- self.assert_compile(table1.c.myid.in_(['a', 'b']),
- "mytable.myid IN (:myid_1, :myid_2)")
-
- self.assert_compile(table1.c.myid.in_(iter(['a', 'b'])),
- "mytable.myid IN (:myid_1, :myid_2)")
-
- self.assert_compile(table1.c.myid.in_([literal('a')]),
- "mytable.myid IN (:param_1)")
-
- self.assert_compile(table1.c.myid.in_([literal('a'), 'b']),
- "mytable.myid IN (:param_1, :myid_1)")
-
- self.assert_compile(table1.c.myid.in_([literal('a'), literal('b')]),
- "mytable.myid IN (:param_1, :param_2)")
-
- self.assert_compile(table1.c.myid.in_(['a', literal('b')]),
- "mytable.myid IN (:myid_1, :param_1)")
-
- self.assert_compile(table1.c.myid.in_([literal(1) + 'a']),
- "mytable.myid IN (:param_1 + :param_2)")
-
- self.assert_compile(table1.c.myid.in_([literal('a') +'a', 'b']),
- "mytable.myid IN (:param_1 || :param_2, :myid_1)")
-
- self.assert_compile(table1.c.myid.in_([literal('a') + literal('a'), literal('b')]),
- "mytable.myid IN (:param_1 || :param_2, :param_3)")
-
- self.assert_compile(table1.c.myid.in_([1, literal(3) + 4]),
- "mytable.myid IN (:myid_1, :param_1 + :param_2)")
-
- self.assert_compile(table1.c.myid.in_([literal('a') < 'b']),
- "mytable.myid IN (:param_1 < :param_2)")
-
- self.assert_compile(table1.c.myid.in_([table1.c.myid]),
- "mytable.myid IN (mytable.myid)")
-
- self.assert_compile(table1.c.myid.in_(['a', table1.c.myid]),
- "mytable.myid IN (:myid_1, mytable.myid)")
-
- self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid]),
- "mytable.myid IN (:param_1, mytable.myid)")
-
- self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid + 'a']),
- "mytable.myid IN (:param_1, mytable.myid + :myid_1)")
-
- self.assert_compile(table1.c.myid.in_([literal(1), 'a' + table1.c.myid]),
- "mytable.myid IN (:param_1, :myid_1 + mytable.myid)")
-
- self.assert_compile(table1.c.myid.in_([1, 2, 3]),
- "mytable.myid IN (:myid_1, :myid_2, :myid_3)")
-
- self.assert_compile(table1.c.myid.in_(select([table2.c.otherid])),
- "mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
-
- self.assert_compile(~table1.c.myid.in_(select([table2.c.otherid])),
- "mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)")
-
- # text
- self.assert_compile(
- table1.c.myid.in_(
- text("SELECT myothertable.otherid FROM myothertable")
- ),
- "mytable.myid IN (SELECT myothertable.otherid "
- "FROM myothertable)"
- )
-
- # test empty in clause
- self.assert_compile(table1.c.myid.in_([]),
- "mytable.myid != mytable.myid")
-
- self.assert_compile(
- select([table1.c.myid.in_(select([table2.c.otherid]))]),
- "SELECT mytable.myid IN (SELECT myothertable.otherid "
- "FROM myothertable) AS anon_1 FROM mytable"
- )
- self.assert_compile(
- select([table1.c.myid.in_(select([table2.c.otherid]).as_scalar())]),
- "SELECT mytable.myid IN (SELECT myothertable.otherid "
- "FROM myothertable) AS anon_1 FROM mytable"
- )
-
- self.assert_compile(table1.c.myid.in_(
- union(
- select([table1.c.myid], table1.c.myid == 5),
- select([table1.c.myid], table1.c.myid == 12),
- )
- ), "mytable.myid IN ("\
- "SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1 "\
- "UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
-
- # test that putting a select in an IN clause does not
- # blow away its ORDER BY clause
- self.assert_compile(
- select([table1, table2],
- table2.c.otherid.in_(
- select([table2.c.otherid], order_by=[table2.c.othername],
- limit=10, correlate=False)
- ),
- from_obj=[table1.join(table2,
- table1.c.myid == table2.c.otherid)],
- order_by=[table1.c.myid]
- ),
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable "\
- "JOIN myothertable ON mytable.myid = myothertable.otherid "
- "WHERE myothertable.otherid IN (SELECT myothertable.otherid "\
- "FROM myothertable ORDER BY myothertable.othername "
- "LIMIT :param_1) ORDER BY mytable.myid",
- {'param_1':10}
- )
def test_tuple(self):
- self.assert_compile(tuple_(table1.c.myid, table1.c.name).in_([(1, 'foo'), (5, 'bar')]),
- "(mytable.myid, mytable.name) IN ((:param_1, :param_2), (:param_3, :param_4))"
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ [(1, 'foo'), (5, 'bar')]),
+ "(mytable.myid, mytable.name) IN "
+ "((:param_1, :param_2), (:param_3, :param_4))"
)
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
[tuple_(table2.c.otherid, table2.c.othername)]
),
- "(mytable.myid, mytable.name) IN ((myothertable.otherid, myothertable.othername))"
+ "(mytable.myid, mytable.name) IN "
+ "((myothertable.otherid, myothertable.othername))"
)
self.assert_compile(
@@ -2227,7 +2253,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
eq_(str(cast(1234, Text).compile(dialect=dialect)),
'CAST(%s AS %s)' % (literal, expected_results[3]))
eq_(str(cast('test', String(20)).compile(dialect=dialect)),
- 'CAST(%s AS %s)' %(literal, expected_results[4]))
+ 'CAST(%s AS %s)' % (literal, expected_results[4]))
+
# fixme: shoving all of this dialect-specific stuff in one test
# is now officialy completely ridiculous AND non-obviously omits
# coverage on other dialects.
@@ -2243,8 +2270,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"anon_1 \nFROM casttest")
# first test with PostgreSQL engine
- check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)'
- , 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
+ check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)',
+ 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
# then the Oracle engine
check_results(oracle.dialect(), ['NUMERIC', 'NUMERIC(12, 9)',
@@ -2389,18 +2416,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
table = Table('dt', metadata,
Column('date', Date))
self.assert_compile(
- table.select(table.c.date.between(datetime.date(2006,6,1),
- datetime.date(2006,6,5))),
+ table.select(table.c.date.between(datetime.date(2006, 6, 1),
+ datetime.date(2006, 6, 5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
- checkparams={'date_1':datetime.date(2006,6,1),
- 'date_2':datetime.date(2006,6,5)})
+ checkparams={'date_1': datetime.date(2006, 6, 1),
+ 'date_2': datetime.date(2006, 6, 5)})
self.assert_compile(
- table.select(sql.between(table.c.date, datetime.date(2006,6,1),
- datetime.date(2006,6,5))),
+ table.select(sql.between(table.c.date, datetime.date(2006, 6, 1),
+ datetime.date(2006, 6, 5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
- checkparams={'date_1':datetime.date(2006,6,1),
- 'date_2':datetime.date(2006,6,5)})
+ checkparams={'date_1': datetime.date(2006, 6, 1),
+ 'date_2': datetime.date(2006, 6, 5)})
def test_delayed_col_naming(self):
@@ -2462,7 +2489,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
t1 = Table('mytable', meta, Column('col1', Integer))
exprs = (
- table1.c.myid==12,
+ table1.c.myid == 12,
func.hoho(table1.c.myid),
cast(table1.c.name, Numeric)
)
@@ -2470,9 +2497,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
(table1.c.name, 'name', 'mytable.name', None),
(exprs[0], str(exprs[0]), 'mytable.myid = :myid_1', 'anon_1'),
(exprs[1], str(exprs[1]), 'hoho(mytable.myid)', 'hoho_1'),
- (exprs[2], str(exprs[2]), 'CAST(mytable.name AS NUMERIC)', 'anon_1'),
+ (exprs[2], str(exprs[2]),
+ 'CAST(mytable.name AS NUMERIC)', 'anon_1'),
(t1.c.col1, 'col1', 'mytable.col1', None),
- (column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '')
+ (column('some wacky thing'), 'some wacky thing',
+ '"some wacky thing"', '')
):
if getattr(col, 'table', None) is not None:
t = col.table
@@ -2483,7 +2512,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert s1.c.keys() == [key], s1.c.keys()
if label:
- self.assert_compile(s1, "SELECT %s AS %s FROM mytable" % (expr, label))
+ self.assert_compile(s1,
+ "SELECT %s AS %s FROM mytable" % (expr, label))
else:
self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,))
@@ -2496,11 +2526,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# sqlite rule labels subquery columns
self.assert_compile(s1,
"SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
- (key,expr, key))
+ (key, expr, key))
else:
self.assert_compile(s1,
"SELECT %s FROM (SELECT %s FROM mytable)" %
- (expr,expr))
+ (expr, expr))
def test_hints(self):
s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s")
@@ -2514,33 +2544,24 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
subs4 = select([
table1, table2
- ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).\
+ ]).select_from(table1.join(table2, table1.c.myid == table2.c.otherid)).\
with_hint(table1, 'hint1')
s4 = select([table3]).select_from(
table3.join(
subs4,
- subs4.c.othername==table3.c.otherstuff
+ subs4.c.othername == table3.c.otherstuff
)
).\
with_hint(table3, 'hint3')
- subs5 = select([
- table1, table2
- ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid))
- s5 = select([table3]).select_from(
- table3.join(
- subs5,
- subs5.c.othername==table3.c.otherstuff
- )
- ).\
- with_hint(table3, 'hint3').\
- with_hint(table1, 'hint1')
t1 = table('QuotedName', column('col1'))
- s6 = select([t1.c.col1]).where(t1.c.col1>10).with_hint(t1, '%(name)s idx1')
+ s6 = select([t1.c.col1]).where(t1.c.col1 > 10).\
+ with_hint(t1, '%(name)s idx1')
a2 = t1.alias('SomeName')
- s7 = select([a2.c.col1]).where(a2.c.col1>10).with_hint(a2, '%(name)s idx1')
+ s7 = select([a2.c.col1]).where(a2.c.col1 > 10).\
+ with_hint(a2, '%(name)s idx1')
mysql_d, oracle_d, sybase_d = \
mysql.dialect(), \
@@ -2637,28 +2658,31 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
# cols provided literally
self.assert_compile(
insert(table1, {
- table1.c.myid : bindparam('userid'),
- table1.c.name : bindparam('username')}),
+ table1.c.myid: bindparam('userid'),
+ table1.c.name: bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
# insert with user-supplied bind params for specific columns, cols
# provided as strings
self.assert_compile(
- insert(table1, dict(myid = 3, name = 'jack')),
+ insert(table1, dict(myid=3, name='jack')),
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
)
# test with a tuple of params instead of named
self.assert_compile(
insert(table1, (3, 'jack', 'mydescription')),
- "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)",
- checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'}
+ "INSERT INTO mytable (myid, name, description) VALUES "
+ "(:myid, :name, :description)",
+ checkparams={
+ 'myid': 3, 'name': 'jack', 'description': 'mydescription'}
)
self.assert_compile(
insert(table1, values={
- table1.c.myid : bindparam('userid')
- }).values({table1.c.name : bindparam('username')}),
+ table1.c.myid: bindparam('userid')
+ }).values(
+ {table1.c.name: bindparam('username')}),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
)
@@ -2693,63 +2717,65 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
update(table1, table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
- params = {table1.c.name:'fred'})
+ params={table1.c.name: 'fred'})
self.assert_compile(
- table1.update().where(table1.c.myid==7).
- values({table1.c.myid:5}),
+ table1.update().where(table1.c.myid == 7).
+ values({table1.c.myid: 5}),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
- checkparams={'myid':5, 'myid_1':7})
+ checkparams={'myid': 5, 'myid_1': 7})
self.assert_compile(
update(table1, table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
- params = {'name':'fred'})
+ params={'name': 'fred'})
self.assert_compile(
- update(table1, values = {table1.c.name : table1.c.myid}),
+ update(table1, values={table1.c.name: table1.c.myid}),
"UPDATE mytable SET name=mytable.myid")
self.assert_compile(
update(table1,
- whereclause = table1.c.name == bindparam('crit'),
- values = {table1.c.name : 'hi'}),
+ whereclause=table1.c.name == bindparam('crit'),
+ values={table1.c.name: 'hi'}),
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
- params = {'crit' : 'notthere'},
- checkparams={'crit':'notthere', 'name':'hi'})
+ params={'crit': 'notthere'},
+ checkparams={'crit': 'notthere', 'name': 'hi'})
self.assert_compile(
update(table1, table1.c.myid == 12,
- values = {table1.c.name : table1.c.myid}),
+ values={table1.c.name: table1.c.myid}),
"UPDATE mytable SET name=mytable.myid, description="
":description WHERE mytable.myid = :myid_1",
- params = {'description':'test'},
- checkparams={'description':'test', 'myid_1':12})
+ params={'description': 'test'},
+ checkparams={'description': 'test', 'myid_1': 12})
self.assert_compile(
update(table1, table1.c.myid == 12,
- values = {table1.c.myid : 9}),
+ values={table1.c.myid: 9}),
"UPDATE mytable SET myid=:myid, description=:description "
"WHERE mytable.myid = :myid_1",
- params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
+ params={'myid_1': 12, 'myid': 9, 'description': 'test'})
self.assert_compile(
- update(table1, table1.c.myid ==12),
+ update(table1, table1.c.myid == 12),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
- params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
- s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
+ params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
+ s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
c = s.compile(column_keys=['id', 'name'])
self.assert_compile(
update(table1, table1.c.myid == 12,
- values = {table1.c.name : table1.c.myid}
- ).values({table1.c.name:table1.c.name + 'foo'}),
+ values={table1.c.name: table1.c.myid}
+ ).values({table1.c.name: table1.c.name + 'foo'}),
"UPDATE mytable SET name=(mytable.name || :name_1), "
"description=:description WHERE mytable.myid = :myid_1",
- params = {'description':'test'})
+ params={'description': 'test'})
eq_(str(s), str(c))
self.assert_compile(update(table1,
(table1.c.myid == func.hoho(4)) &
- (table1.c.name == literal('foo') + table1.c.name + literal('lala')),
- values = {
- table1.c.name : table1.c.name + "lala",
- table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
+ (table1.c.name == literal('foo') +
+ table1.c.name + literal('lala')),
+ values={
+ table1.c.name: table1.c.name + "lala",
+ table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
}), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
"name=(mytable.name || :name_1) "
- "WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || "
+ "WHERE mytable.myid = hoho(:hoho_1) "
+ "AND mytable.name = :param_2 || "
"mytable.name || :param_3")
def test_update_prefix(self):
@@ -2768,12 +2794,12 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
update(talias1, talias1.c.myid == 7),
"UPDATE mytable AS t1 SET name=:name WHERE t1.myid = :myid_1",
- params = {table1.c.name:'fred'})
+ params={table1.c.name: 'fred'})
self.assert_compile(
update(talias1, table1.c.myid == 7),
"UPDATE mytable AS t1 SET name=:name FROM "
"mytable WHERE mytable.myid = :myid_1",
- params = {table1.c.name:'fred'})
+ params={table1.c.name: 'fred'})
def test_update_to_expression(self):
"""test update from an expression.
@@ -2792,33 +2818,35 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
def test_correlated_update(self):
# test against a straight text subquery
- u = update(table1, values = {
- table1.c.name :
+ u = update(table1, values={
+ table1.c.name:
text("(select name from mytable where id=mytable.id)")})
self.assert_compile(u,
"UPDATE mytable SET name=(select name from mytable "
"where id=mytable.id)")
mt = table1.alias()
- u = update(table1, values = {
- table1.c.name :
- select([mt.c.name], mt.c.myid==table1.c.myid)
+ u = update(table1, values={
+ table1.c.name:
+ select([mt.c.name], mt.c.myid == table1.c.myid)
})
self.assert_compile(u,
"UPDATE mytable SET name=(SELECT mytable_1.name FROM "
- "mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
+ "mytable AS mytable_1 WHERE "
+ "mytable_1.myid = mytable.myid)")
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
- u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
+ u = update(table1, table1.c.name == 'jack', values={table1.c.name: s})
self.assert_compile(u,
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
"myothertable.othername FROM myothertable WHERE "
- "myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
+ "myothertable.otherid = mytable.myid) "
+ "WHERE mytable.name = :name_1")
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
- u = update(table1, table1.c.name==s)
+ u = update(table1, table1.c.name == s)
self.assert_compile(u,
"UPDATE mytable SET myid=:myid, name=:name, "
"description=:description WHERE mytable.name = "
@@ -2827,7 +2855,7 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
- u = table1.update(table1.c.name==s)
+ u = table1.update(table1.c.name == s)
self.assert_compile(u,
"UPDATE mytable SET myid=:myid, name=:name, "
"description=:description WHERE mytable.name = "
@@ -2868,7 +2896,7 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
"DELETE FROM mytable WHERE mytable.myid = :myid_1")
self.assert_compile(
table1.delete().where(table1.c.myid == 7).\
- where(table1.c.name=='somename'),
+ where(table1.c.name == 'somename'),
"DELETE FROM mytable WHERE mytable.myid = :myid_1 "
"AND mytable.name = :name_1")
@@ -2891,13 +2919,14 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
def test_correlated_delete(self):
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
- u = delete(table1, table1.c.name==s)
- self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\
- "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)")
+ u = delete(table1, table1.c.name == s)
+ self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "
+ "(SELECT myothertable.othername FROM myothertable "
+ "WHERE myothertable.otherid = :otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
- u = table1.delete(table1.c.name==s)
+ u = table1.delete(table1.c.name == s)
self.assert_compile(u,
"DELETE FROM mytable WHERE mytable.name = (SELECT "
"myothertable.othername FROM myothertable WHERE "
@@ -2909,7 +2938,7 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
t = table('foo', column('x'), column('y'))
- u = t.update().where(t.c.x==bindparam('x'))
+ u = t.update().where(t.c.x == bindparam('x'))
assert_raises(exc.CompileError, u.compile)
@@ -2917,27 +2946,30 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(exc.CompileError, u.values(x=7).compile)
- self.assert_compile(u.values(y=7), "UPDATE foo SET y=:y WHERE foo.x = :x")
+ self.assert_compile(u.values(y=7),
+ "UPDATE foo SET y=:y WHERE foo.x = :x")
- assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y'])
+ assert_raises(exc.CompileError,
+ u.values(x=7).compile, column_keys=['x', 'y'])
assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y'])
self.assert_compile(u.values(x=3 + bindparam('x')),
- "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
+ "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
self.assert_compile(u.values(x=3 + bindparam('x')),
- "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
- params={'x':1})
+ "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
+ params={'x': 1})
self.assert_compile(u.values(x=3 + bindparam('x')),
- "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
- params={'x':1, 'y':2})
+ "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
+ params={'x': 1, 'y': 2})
i = t.insert().values(x=3 + bindparam('x'))
- self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
self.assert_compile(i,
- "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
- params={'x':1, 'y':2})
+ "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
+ self.assert_compile(i,
+ "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
+ params={'x': 1, 'y': 2})
i = t.insert().values(x=bindparam('y'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)")
@@ -2949,12 +2981,16 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(exc.CompileError, i.compile)
i = t.insert().values(x=3 + bindparam('x2'))
- self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
- self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
- self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
- params={'x':1, 'y':2})
- self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
- params={'x2':1, 'y':2})
+ self.assert_compile(i,
+ "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
+ self.assert_compile(i,
+ "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
+ self.assert_compile(i,
+ "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
+ params={'x': 1, 'y': 2})
+ self.assert_compile(i,
+ "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
+ params={'x2': 1, 'y': 2})
def test_unconsumed_names(self):
t = table("t", column("x"), column("y"))
@@ -2973,8 +3009,8 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises_message(
exc.CompileError,
"Unconsumed column names: j",
- t.update().values(x=5, j=7).values({t2.c.z:5}).
- where(t.c.x==t2.c.q).compile,
+ t.update().values(x=5, j=7).values({t2.c.z: 5}).
+ where(t.c.x == t2.c.q).compile,
)
# bindparam names don't get counted
@@ -2989,7 +3025,7 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
i,
"INSERT INTO t (x) VALUES ((:param_1 + :x2))",
- params={"x2":1}
+ params={"x2": 1}
)
assert_raises_message(
@@ -3005,12 +3041,12 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
t = table('foo', column('id'), column('foo_id'))
self.assert_compile(
- t.update().where(t.c.id==5),
+ t.update().where(t.c.id == 5),
"UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :id_1"
)
self.assert_compile(
- t.update().where(t.c.id==bindparam(key=t.c.id._label)),
+ t.update().where(t.c.id == bindparam(key=t.c.id._label)),
"UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1"
)
@@ -3057,12 +3093,13 @@ class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL):
def test_insert(self):
m = MetaData()
- foo = Table('foo', m,
+ foo = Table('foo', m,
Column('id', Integer))
t = Table('test', m,
Column('col1', Integer, default=func.foo(1)),
- Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])),
+ Column('col2', Integer, default=select(
+ [func.coalesce(func.max(foo.c.id))])),
)
self.assert_compile(t.insert(inline=True, values={}),
@@ -3072,16 +3109,17 @@ class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL):
def test_update(self):
m = MetaData()
- foo = Table('foo', m,
+ foo = Table('foo', m,
Column('id', Integer))
t = Table('test', m,
Column('col1', Integer, onupdate=func.foo(1)),
- Column('col2', Integer, onupdate=select([func.coalesce(func.max(foo.c.id))])),
+ Column('col2', Integer, onupdate=select(
+ [func.coalesce(func.max(foo.c.id))])),
Column('col3', String(30))
)
- self.assert_compile(t.update(inline=True, values={'col3':'foo'}),
+ self.assert_compile(t.update(inline=True, values={'col3': 'foo'}),
"UPDATE test SET col1=foo(:foo_1), col2=(SELECT "
"coalesce(max(foo.id)) AS coalesce_1 FROM foo), "
"col3=:col3")
@@ -3091,55 +3129,70 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
def test_select(self):
self.assert_compile(table4.select(),
- "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
- " remote_owner.remotetable.value FROM remote_owner.remotetable")
-
- self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
- "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
- " remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "
+ "SELECT remote_owner.remotetable.rem_id, "
+ "remote_owner.remotetable.datatype_id,"
+ " remote_owner.remotetable.value "
+ "FROM remote_owner.remotetable")
+
+ self.assert_compile(table4.select(and_(table4.c.datatype_id == 7,
+ table4.c.value == 'hi')),
+ "SELECT remote_owner.remotetable.rem_id, "
+ "remote_owner.remotetable.datatype_id,"
+ " remote_owner.remotetable.value "
+ "FROM remote_owner.remotetable WHERE "
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND"
" remote_owner.remotetable.value = :value_1")
- s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'), use_labels=True)
+ s = table4.select(and_(table4.c.datatype_id == 7,
+ table4.c.value == 'hi'), use_labels=True)
self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS"
- " remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS"
- " remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "
- "AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "
+ " remote_owner_remotetable_rem_id, "
+ "remote_owner.remotetable.datatype_id AS"
+ " remote_owner_remotetable_datatype_id, "
+ "remote_owner.remotetable.value "
+ "AS remote_owner_remotetable_value FROM "
+ "remote_owner.remotetable WHERE "
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND "
"remote_owner.remotetable.value = :value_1")
# multi-part schema name
self.assert_compile(table5.select(),
'SELECT "dbo.remote_owner".remotetable.rem_id, '
- '"dbo.remote_owner".remotetable.datatype_id, "dbo.remote_owner".remotetable.value '
+ '"dbo.remote_owner".remotetable.datatype_id, '
+ '"dbo.remote_owner".remotetable.value '
'FROM "dbo.remote_owner".remotetable'
)
# multi-part schema name labels - convert '.' to '_'
self.assert_compile(table5.select(use_labels=True),
'SELECT "dbo.remote_owner".remotetable.rem_id AS'
- ' dbo_remote_owner_remotetable_rem_id, "dbo.remote_owner".remotetable.datatype_id'
+ ' dbo_remote_owner_remotetable_rem_id, '
+ '"dbo.remote_owner".remotetable.datatype_id'
' AS dbo_remote_owner_remotetable_datatype_id,'
- ' "dbo.remote_owner".remotetable.value AS dbo_remote_owner_remotetable_value FROM'
+ ' "dbo.remote_owner".remotetable.value AS '
+ 'dbo_remote_owner_remotetable_value FROM'
' "dbo.remote_owner".remotetable'
)
def test_alias(self):
a = alias(table4, 'remtable')
- self.assert_compile(a.select(a.c.datatype_id==7),
- "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM"
+ self.assert_compile(a.select(a.c.datatype_id == 7),
+ "SELECT remtable.rem_id, remtable.datatype_id, "
+ "remtable.value FROM"
" remote_owner.remotetable AS remtable "
"WHERE remtable.datatype_id = :datatype_id_1")
def test_update(self):
self.assert_compile(
- table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}),
+ table4.update(table4.c.value == 'test',
+ values={table4.c.datatype_id: 12}),
"UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "
"WHERE remote_owner.remotetable.value = :value_1")
def test_insert(self):
self.assert_compile(table4.insert(values=(2, 5, 'test')),
- "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "
+ "INSERT INTO remote_owner.remotetable "
+ "(rem_id, datatype_id, value) VALUES "
"(:rem_id, :datatype_id, :value)")
@@ -3152,15 +3205,12 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
Column('id', Integer))
def test_null_constant(self):
- t = self._fixture()
self.assert_compile(_literal_as_text(None), "NULL")
def test_false_constant(self):
- t = self._fixture()
self.assert_compile(_literal_as_text(False), "false")
def test_true_constant(self):
- t = self._fixture()
self.assert_compile(_literal_as_text(True), "true")
def test_val_and_false(self):
@@ -3257,7 +3307,8 @@ class ResultMapTest(fixtures.TestBase):
)
def test_label_conflict_union(self):
- t1 = Table('t1', MetaData(), Column('a', Integer), Column('b', Integer))
+ t1 = Table('t1', MetaData(), Column('a', Integer),
+ Column('b', Integer))
t2 = Table('t2', MetaData(), Column('t1_a', Integer))
union = select([t2]).union(select([t2])).alias()
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index d0a6522d5..fdec2f840 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -166,7 +166,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
foo, bar = CustomObj('foo', String), CustomObj('bar', String)
bin = foo == bar
- s = set(ClauseVisitor().iterate(bin))
+ set(ClauseVisitor().iterate(bin))
assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin])
class BinaryEndpointTraversalTest(fixtures.TestBase):
@@ -204,7 +204,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):
column("f")
)
expr = tuple_(
- a, b, b1==tuple_(b1a, b1b == d), c
+ a, b, b1 == tuple_(b1a, b1b == d), c
) > tuple_(
func.go(e + f)
)
@@ -309,7 +309,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1")
def test_join(self):
- clause = t1.join(t2, t1.c.col2==t2.c.col2)
+ clause = t1.join(t2, t1.c.col2 == t2.c.col2)
c1 = str(clause)
assert str(clause) == str(CloningVisitor().traverse(clause))
@@ -319,7 +319,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
clause2 = Vis().traverse(clause)
assert c1 == str(clause)
- assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3))
+ assert str(clause2) == str(t1.join(t2, t1.c.col2 == t2.c.col3))
def test_aliased_column_adapt(self):
clause = t1.select()
@@ -439,10 +439,10 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_select(self):
s2 = select([t1])
s2_assert = str(s2)
- s3_assert = str(select([t1], t1.c.col2==7))
+ s3_assert = str(select([t1], t1.c.col2 == 7))
class Vis(CloningVisitor):
def visit_select(self, select):
- select.append_whereclause(t1.c.col2==7)
+ select.append_whereclause(t1.c.col2 == 7)
s3 = Vis().traverse(s2)
assert str(s3) == s3_assert
assert str(s2) == s2_assert
@@ -450,24 +450,21 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
print str(s3)
class Vis(ClauseVisitor):
def visit_select(self, select):
- select.append_whereclause(t1.c.col2==7)
+ select.append_whereclause(t1.c.col2 == 7)
Vis().traverse(s2)
assert str(s2) == s3_assert
- print "------------------"
-
- s4_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col3==9)))
+ s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9)))
class Vis(CloningVisitor):
def visit_select(self, select):
- select.append_whereclause(t1.c.col3==9)
+ select.append_whereclause(t1.c.col3 == 9)
s4 = Vis().traverse(s3)
print str(s3)
print str(s4)
assert str(s4) == s4_assert
assert str(s3) == s3_assert
- print "------------------"
- s5_assert = str(select([t1], and_(t1.c.col2==7, t1.c.col1==9)))
+ s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9)))
class Vis(CloningVisitor):
def visit_binary(self, binary):
if binary.left is t1.c.col3:
@@ -513,8 +510,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adapt_union(self):
u = union(
- t1.select().where(t1.c.col1==4),
- t1.select().where(t1.c.col1==5)
+ t1.select().where(t1.c.col1 == 4),
+ t1.select().where(t1.c.col1 == 5)
).alias()
assert sql_util.ClauseAdapter(u).traverse(t1) is u
@@ -523,9 +520,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
"""test that unique bindparams change their name upon clone()
to prevent conflicts"""
- s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias()
+ s = select([t1], t1.c.col1 == bindparam(None, unique=True)).alias()
s2 = CloningVisitor().traverse(s).alias()
- s3 = select([s], s.c.col2==s2.c.col2)
+ s3 = select([s], s.c.col2 == s2.c.col2)
self.assert_compile(s3,
"SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
@@ -536,9 +533,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
"AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "
"WHERE anon_1.col2 = anon_2.col2")
- s = select([t1], t1.c.col1==4).alias()
+ s = select([t1], t1.c.col1 == 4).alias()
s2 = CloningVisitor().traverse(s).alias()
- s3 = select([s], s.c.col2==s2.c.col2)
+ s3 = select([s], s.c.col2 == s2.c.col2)
self.assert_compile(s3,
"SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
"(SELECT table1.col1 AS col1, table1.col2 AS col2, "
@@ -567,7 +564,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1],
from_obj=[t1, subq,
- t1.join(subq, t1.c.col1==subq.c.col2)]
+ t1.join(subq, t1.c.col1 == subq.c.col2)]
)
orig = str(s)
s2 = CloningVisitor().traverse(s)
@@ -585,24 +582,24 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
subq = subq.alias('subq')
s = select([t1.c.col1, subq.c.col1],
from_obj=[t1, subq,
- t1.join(subq, t1.c.col1==subq.c.col2)]
+ t1.join(subq, t1.c.col1 == subq.c.col2)]
)
s5 = CloningVisitor().traverse(s)
assert orig == str(s) == str(s5)
def test_correlated_select(self):
- s = select(['*'], t1.c.col1==t2.c.col1,
+ s = select(['*'], t1.c.col1 == t2.c.col1,
from_obj=[t1, t2]).correlate(t2)
class Vis(CloningVisitor):
def visit_select(self, select):
- select.append_whereclause(t1.c.col2==7)
+ select.append_whereclause(t1.c.col2 == 7)
self.assert_compile(Vis().traverse(s),
"SELECT * FROM table1 WHERE table1.col1 = table2.col1 "
"AND table1.col2 = :col2_1")
def test_this_thing(self):
- s = select([t1]).where(t1.c.col1=='foo').alias()
+ s = select([t1]).where(t1.c.col1 == 'foo').alias()
s2 = select([s.c.col1])
self.assert_compile(s2,
@@ -622,7 +619,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
def test_select_fromtwice(self):
t1a = t1.alias()
- s = select([1], t1.c.col1==t1a.c.col1, from_obj=t1a).correlate(t1)
+ s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1)
self.assert_compile(s,
'SELECT 1 FROM table1 AS table1_1 WHERE '
'table1.col1 = table1_1.col1')
@@ -632,9 +629,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
'SELECT 1 FROM table1 AS table1_1 WHERE '
'table1.col1 = table1_1.col1')
- s = select([t1]).where(t1.c.col1=='foo').alias()
+ s = select([t1]).where(t1.c.col1 == 'foo').alias()
- s2 = select([1], t1.c.col1==s.c.col1, from_obj=s).correlate(t1)
+ s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1)
self.assert_compile(s2,
'SELECT 1 FROM (SELECT table1.col1 AS '
'col1, table1.col2 AS col2, table1.col3 AS '
@@ -759,7 +756,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
== addresses.c.user_id).correlate(users)
s = sql_util.ClauseAdapter(ualias).traverse(s)
- j1 = addresses.join(ualias, addresses.c.user_id==ualias.c.id)
+ j1 = addresses.join(ualias, addresses.c.user_id == ualias.c.id)
self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s),
'SELECT count(addresses.id) AS count_1 '
@@ -888,7 +885,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
b = Table('b', m, Column('x', Integer), Column('y', Integer))
c = Table('c', m, Column('x', Integer), Column('y', Integer))
- alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias()
+ alias = select([a]).select_from(a.join(b, a.c.x == b.c.x)).alias()
# two levels of indirection from c.x->b.x->a.x, requires recursive
# corresponding_column call
@@ -918,7 +915,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
j1 = a.outerjoin(b)
j2 = select([j1], use_labels=True)
- j3 = c.join(j2, j2.c.b_id==c.c.bid)
+ j3 = c.join(j2, j2.c.b_id == c.c.bid)
j4 = j3.outerjoin(d)
self.assert_compile(j4,
@@ -1040,7 +1037,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
sql_util.ClauseAdapter(u).\
- traverse(select([c.c.bid]).where(c.c.bid==u.c.b_aid)),
+ traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)),
"SELECT c.bid "\
"FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid "
"FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id "
diff --git a/test/sql/test_operators.py b/test/sql/test_operators.py
index 24b458958..cde70d8da 100644
--- a/test/sql/test_operators.py
+++ b/test/sql/test_operators.py
@@ -4,11 +4,12 @@ from sqlalchemy.testing import assert_raises_message
from sqlalchemy.sql import column, desc, asc, literal, collate
from sqlalchemy.sql.expression import BinaryExpression, \
ClauseList, Grouping, \
- UnaryExpression
-from sqlalchemy.sql import operators
+ UnaryExpression, select, union
+from sqlalchemy.sql import operators, table
+from sqlalchemy import String, Integer
from sqlalchemy import exc
from sqlalchemy.schema import Column, Table, MetaData
-from sqlalchemy.types import Integer, TypeEngine, TypeDecorator, UserDefinedType
+from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType
from sqlalchemy.dialects import mysql, firebird
from sqlalchemy import text, literal_column
@@ -307,88 +308,339 @@ from sqlalchemy import and_, not_, between
class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
- def test_operator_precedence(self):
- # TODO: clean up /break up
- metadata = MetaData()
- table = Table('op', metadata,
- Column('field', Integer))
- self.assert_compile(table.select((table.c.field == 5) == None),
+ table = table('op', column('field'))
+
+ def test_operator_precedence_1(self):
+ self.assert_compile(
+ self.table.select((self.table.c.field == 5) == None),
"SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
- self.assert_compile(table.select((table.c.field + 5) == table.c.field),
+
+ def test_operator_precedence_2(self):
+ self.assert_compile(
+ self.table.select(
+ (self.table.c.field + 5) == self.table.c.field),
"SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
- self.assert_compile(table.select((table.c.field + 5) * 6),
+
+ def test_operator_precedence_3(self):
+ self.assert_compile(
+ self.table.select((self.table.c.field + 5) * 6),
"SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
- self.assert_compile(table.select((table.c.field * 5) + 6),
+
+ def test_operator_precedence_4(self):
+ self.assert_compile(self.table.select((self.table.c.field * 5) + 6),
"SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
- self.assert_compile(table.select(5 + table.c.field.in_([5, 6])),
+
+ def test_operator_precedence_5(self):
+ self.assert_compile(self.table.select(
+ 5 + self.table.c.field.in_([5, 6])),
"SELECT op.field FROM op WHERE :param_1 + "
"(op.field IN (:field_1, :field_2))")
- self.assert_compile(table.select((5 + table.c.field).in_([5, 6])),
+
+ def test_operator_precedence_6(self):
+ self.assert_compile(self.table.select(
+ (5 + self.table.c.field).in_([5, 6])),
"SELECT op.field FROM op WHERE :field_1 + op.field "
"IN (:param_1, :param_2)")
- self.assert_compile(table.select(not_(and_(table.c.field == 5,
- table.c.field == 7))),
+
+ def test_operator_precedence_7(self):
+ self.assert_compile(self.table.select(
+ not_(and_(self.table.c.field == 5,
+ self.table.c.field == 7))),
"SELECT op.field FROM op WHERE NOT "
"(op.field = :field_1 AND op.field = :field_2)")
- self.assert_compile(table.select(not_(table.c.field == 5)),
+
+ def test_operator_precedence_8(self):
+ self.assert_compile(self.table.select(not_(self.table.c.field == 5)),
"SELECT op.field FROM op WHERE op.field != :field_1")
- self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
+
+ def test_operator_precedence_9(self):
+ self.assert_compile(self.table.select(
+ not_(self.table.c.field.between(5, 6))),
"SELECT op.field FROM op WHERE NOT "
"(op.field BETWEEN :field_1 AND :field_2)")
- self.assert_compile(table.select(not_(table.c.field) == 5),
+
+ def test_operator_precedence_10(self):
+ self.assert_compile(self.table.select(not_(self.table.c.field) == 5),
"SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
- self.assert_compile(table.select((table.c.field == table.c.field).\
+
+ def test_operator_precedence_11(self):
+ self.assert_compile(self.table.select(
+ (self.table.c.field == self.table.c.field).\
between(False, True)),
"SELECT op.field FROM op WHERE (op.field = op.field) "
"BETWEEN :param_1 AND :param_2")
- self.assert_compile(table.select(
- between((table.c.field == table.c.field), False, True)),
+
+ def test_operator_precedence_12(self):
+ self.assert_compile(self.table.select(
+ between((self.table.c.field == self.table.c.field),
+ False, True)),
"SELECT op.field FROM op WHERE (op.field = op.field) "
"BETWEEN :param_1 AND :param_2")
- self.assert_compile(table.select(
- table.c.field.match(table.c.field).is_(None)),
+ def test_operator_precedence_13(self):
+ self.assert_compile(self.table.select(
+ self.table.c.field.match(
+ self.table.c.field).is_(None)),
"SELECT op.field FROM op WHERE (op.field MATCH op.field) IS NULL")
class OperatorAssociativityTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
- def test_associativity(self):
- # TODO: clean up /break up
+ def test_associativity_1(self):
f = column('f')
self.assert_compile(f - f, "f - f")
+
+ def test_associativity_2(self):
+ f = column('f')
self.assert_compile(f - f - f, "(f - f) - f")
+ def test_associativity_3(self):
+ f = column('f')
self.assert_compile((f - f) - f, "(f - f) - f")
+
+ def test_associativity_4(self):
+ f = column('f')
self.assert_compile((f - f).label('foo') - f, "(f - f) - f")
+
+ def test_associativity_5(self):
+ f = column('f')
self.assert_compile(f - (f - f), "f - (f - f)")
+
+ def test_associativity_6(self):
+ f = column('f')
self.assert_compile(f - (f - f).label('foo'), "f - (f - f)")
+ def test_associativity_7(self):
+ f = column('f')
# because - less precedent than /
self.assert_compile(f / (f - f), "f / (f - f)")
+
+ def test_associativity_8(self):
+ f = column('f')
self.assert_compile(f / (f - f).label('foo'), "f / (f - f)")
+ def test_associativity_9(self):
+ f = column('f')
self.assert_compile(f / f - f, "f / f - f")
+
+ def test_associativity_10(self):
+ f = column('f')
self.assert_compile((f / f) - f, "f / f - f")
+
+ def test_associativity_11(self):
+ f = column('f')
self.assert_compile((f / f).label('foo') - f, "f / f - f")
+ def test_associativity_12(self):
+ f = column('f')
# because / more precedent than -
self.assert_compile(f - (f / f), "f - f / f")
+
+ def test_associativity_13(self):
+ f = column('f')
self.assert_compile(f - (f / f).label('foo'), "f - f / f")
+
+ def test_associativity_14(self):
+ f = column('f')
self.assert_compile(f - f / f, "f - f / f")
+
+ def test_associativity_15(self):
+ f = column('f')
self.assert_compile((f - f) / f, "(f - f) / f")
+ def test_associativity_16(self):
+ f = column('f')
self.assert_compile(((f - f) / f) - f, "(f - f) / f - f")
+
+ def test_associativity_17(self):
+ f = column('f')
self.assert_compile((f - f) / (f - f), "(f - f) / (f - f)")
+ def test_associativity_18(self):
+ f = column('f')
# higher precedence
self.assert_compile((f / f) - (f / f), "f / f - f / f")
+ def test_associativity_19(self):
+ f = column('f')
self.assert_compile((f / f) - (f - f), "f / f - (f - f)")
+
+ def test_associativity_20(self):
+ f = column('f')
self.assert_compile((f / f) / (f - f), "(f / f) / (f - f)")
+
+ def test_associativity_21(self):
+ f = column('f')
self.assert_compile(f / (f / (f - f)), "f / (f / (f - f))")
+class InTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String),
+ column('description', String),
+ )
+ table2 = table(
+ 'myothertable',
+ column('otherid', Integer),
+ column('othername', String),
+ )
+
+ def test_in_1(self):
+ self.assert_compile(self.table1.c.myid.in_(['a']),
+ "mytable.myid IN (:myid_1)")
+
+ def test_in_2(self):
+ self.assert_compile(~self.table1.c.myid.in_(['a']),
+ "mytable.myid NOT IN (:myid_1)")
+
+ def test_in_3(self):
+ self.assert_compile(self.table1.c.myid.in_(['a', 'b']),
+ "mytable.myid IN (:myid_1, :myid_2)")
+
+ def test_in_4(self):
+ self.assert_compile(self.table1.c.myid.in_(iter(['a', 'b'])),
+ "mytable.myid IN (:myid_1, :myid_2)")
+
+ def test_in_5(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a')]),
+ "mytable.myid IN (:param_1)")
+
+ def test_in_6(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a'), 'b']),
+ "mytable.myid IN (:param_1, :myid_1)")
+
+ def test_in_7(self):
+ self.assert_compile(
+ self.table1.c.myid.in_([literal('a'), literal('b')]),
+ "mytable.myid IN (:param_1, :param_2)")
+
+ def test_in_8(self):
+ self.assert_compile(self.table1.c.myid.in_(['a', literal('b')]),
+ "mytable.myid IN (:myid_1, :param_1)")
+
+ def test_in_9(self):
+ self.assert_compile(self.table1.c.myid.in_([literal(1) + 'a']),
+ "mytable.myid IN (:param_1 + :param_2)")
+
+ def test_in_10(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a') + 'a', 'b']),
+ "mytable.myid IN (:param_1 || :param_2, :myid_1)")
+
+ def test_in_11(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a') + \
+ literal('a'), literal('b')]),
+ "mytable.myid IN (:param_1 || :param_2, :param_3)")
+
+ def test_in_12(self):
+ self.assert_compile(self.table1.c.myid.in_([1, literal(3) + 4]),
+ "mytable.myid IN (:myid_1, :param_1 + :param_2)")
+
+ def test_in_13(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a') < 'b']),
+ "mytable.myid IN (:param_1 < :param_2)")
+
+ def test_in_14(self):
+ self.assert_compile(self.table1.c.myid.in_([self.table1.c.myid]),
+ "mytable.myid IN (mytable.myid)")
+
+ def test_in_15(self):
+ self.assert_compile(self.table1.c.myid.in_(['a', self.table1.c.myid]),
+ "mytable.myid IN (:myid_1, mytable.myid)")
+
+ def test_in_16(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a'),
+ self.table1.c.myid]),
+ "mytable.myid IN (:param_1, mytable.myid)")
+
+ def test_in_17(self):
+ self.assert_compile(self.table1.c.myid.in_([literal('a'), \
+ self.table1.c.myid + 'a']),
+ "mytable.myid IN (:param_1, mytable.myid + :myid_1)")
+
+ def test_in_18(self):
+ self.assert_compile(self.table1.c.myid.in_([literal(1), 'a' + \
+ self.table1.c.myid]),
+ "mytable.myid IN (:param_1, :myid_1 + mytable.myid)")
+
+ def test_in_19(self):
+ self.assert_compile(self.table1.c.myid.in_([1, 2, 3]),
+ "mytable.myid IN (:myid_1, :myid_2, :myid_3)")
+
+ def test_in_20(self):
+ self.assert_compile(self.table1.c.myid.in_(
+ select([self.table2.c.otherid])),
+ "mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
+
+ def test_in_21(self):
+ self.assert_compile(~self.table1.c.myid.in_(
+ select([self.table2.c.otherid])),
+ "mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)")
+
+ def test_in_22(self):
+ self.assert_compile(
+ self.table1.c.myid.in_(
+ text("SELECT myothertable.otherid FROM myothertable")
+ ),
+ "mytable.myid IN (SELECT myothertable.otherid "
+ "FROM myothertable)"
+ )
+
+ @testing.emits_warning('.*empty sequence.*')
+ def test_in_23(self):
+ self.assert_compile(self.table1.c.myid.in_([]),
+ "mytable.myid != mytable.myid")
+
+ def test_in_24(self):
+ self.assert_compile(
+ select([self.table1.c.myid.in_(select([self.table2.c.otherid]))]),
+ "SELECT mytable.myid IN (SELECT myothertable.otherid "
+ "FROM myothertable) AS anon_1 FROM mytable"
+ )
+
+ def test_in_25(self):
+ self.assert_compile(
+ select([self.table1.c.myid.in_(
+ select([self.table2.c.otherid]).as_scalar())]),
+ "SELECT mytable.myid IN (SELECT myothertable.otherid "
+ "FROM myothertable) AS anon_1 FROM mytable"
+ )
+
+ def test_in_26(self):
+ self.assert_compile(self.table1.c.myid.in_(
+ union(
+ select([self.table1.c.myid], self.table1.c.myid == 5),
+ select([self.table1.c.myid], self.table1.c.myid == 12),
+ )
+ ), "mytable.myid IN ("\
+ "SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1 "\
+ "UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
+
+ def test_in_27(self):
+ # test that putting a select in an IN clause does not
+ # blow away its ORDER BY clause
+ self.assert_compile(
+ select([self.table1, self.table2],
+ self.table2.c.otherid.in_(
+ select([self.table2.c.otherid],
+ order_by=[self.table2.c.othername],
+ limit=10, correlate=False)
+ ),
+ from_obj=[self.table1.join(self.table2,
+ self.table1.c.myid == self.table2.c.otherid)],
+ order_by=[self.table1.c.myid]
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable "\
+ "JOIN myothertable ON mytable.myid = myothertable.otherid "
+ "WHERE myothertable.otherid IN (SELECT myothertable.otherid "\
+ "FROM myothertable ORDER BY myothertable.othername "
+ "LIMIT :param_1) ORDER BY mytable.myid",
+ {'param_1': 10}
+ )
+
class ComposedLikeOperatorsTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'