summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES9
-rw-r--r--lib/sqlalchemy/orm/mapper.py20
-rw-r--r--lib/sqlalchemy/orm/strategies.py19
-rw-r--r--lib/sqlalchemy/sql/compiler.py29
-rw-r--r--lib/sqlalchemy/sql/expression.py23
-rw-r--r--test/dialect/firebird.py2
-rwxr-xr-xtest/dialect/mssql.py8
-rw-r--r--test/dialect/oracle.py6
-rw-r--r--test/orm/eager_relations.py6
-rw-r--r--test/orm/query.py14
-rw-r--r--test/sql/generative.py37
-rw-r--r--test/sql/query.py19
-rw-r--r--test/sql/select.py224
-rwxr-xr-xtest/sql/selectable.py2
14 files changed, 244 insertions, 174 deletions
diff --git a/CHANGES b/CHANGES
index d73ea69f0..99a7edcff 100644
--- a/CHANGES
+++ b/CHANGES
@@ -22,6 +22,15 @@ CHANGES
make proper use of Python unicode objects (i.e. u'hello' and not 'hello')
so that data round trips accurately.
+ - generation of "unique" bind parameters has been simplified to use the same
+ "unique identifier" mechanisms as everything else. This doesn't affect
+ user code, except any code that might have been hardcoded against the generated
+ names. Generated bind params now have the form "<paramname>_<num>",
+ whereas before only the second bind of the same name would have this form.
+
+ - bindparam() objects themselves can be used as keys for execute(), i.e.
+ statement.execute({bind1:'foo', bind2:'bar'})
+
- tables with schemas can still be used in sqlite, firebird,
schema name just gets dropped [ticket:890]
diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py
index 18c639af0..5d727bd6a 100644
--- a/lib/sqlalchemy/orm/mapper.py
+++ b/lib/sqlalchemy/orm/mapper.py
@@ -1066,9 +1066,9 @@ class Mapper(object):
mapper = table_to_mapper[table]
clause = sql.and_()
for col in mapper._pks_by_table[table]:
- clause.clauses.append(col == sql.bindparam(col._label, type_=col.type, unique=True))
+ clause.clauses.append(col == sql.bindparam(col._label, type_=col.type))
if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):
- clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type, unique=True))
+ clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type))
statement = table.update(clause)
rows = 0
supports_sane_rowcount = True
@@ -1210,9 +1210,9 @@ class Mapper(object):
del_objects.sort(comparator)
clause = sql.and_()
for col in mapper._pks_by_table[table]:
- clause.clauses.append(col == sql.bindparam(col.key, type_=col.type, unique=True))
+ clause.clauses.append(col == sql.bindparam(col.key, type_=col.type))
if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col):
- clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type, unique=True))
+ clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type))
statement = table.delete(clause)
c = connection.execute(statement, del_objects)
if c.supports_sane_multi_rowcount() and c.rowcount != len(del_objects):
@@ -1389,11 +1389,11 @@ class Mapper(object):
if leftcol is None or rightcol is None:
return
if leftcol.table not in needs_tables:
- binary.left = sql.bindparam(leftcol.name, None, type_=binary.right.type, unique=True)
- param_names.append(leftcol)
+ binary.left = sql.bindparam(None, None, type_=binary.right.type)
+ param_names.append((leftcol, binary.left))
elif rightcol not in needs_tables:
- binary.right = sql.bindparam(rightcol.name, None, type_=binary.right.type, unique=True)
- param_names.append(rightcol)
+ binary.right = sql.bindparam(None, None, type_=binary.right.type)
+ param_names.append((rightcol, binary.right))
allconds = []
param_names = []
@@ -1487,8 +1487,8 @@ class Mapper(object):
identitykey = self.identity_key_from_instance(instance)
params = {}
- for c in param_names:
- params[c.name] = self._get_attr_by_column(instance, c)
+ for c, bind in param_names:
+ params[bind] = self._get_attr_by_column(instance, c)
row = selectcontext.session.connection(self).execute(statement, params).fetchone()
self.populate_instance(selectcontext, instance, row, isnew=False, instancekey=identitykey, ispostselect=True)
diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py
index 3c647ac60..5a765fbd3 100644
--- a/lib/sqlalchemy/orm/strategies.py
+++ b/lib/sqlalchemy/orm/strategies.py
@@ -108,8 +108,8 @@ class ColumnLoader(LoaderStrategy):
statement = sql.select(needs_tables, cond, use_labels=True)
def create_statement(instance):
params = {}
- for c in param_names:
- params[c.name] = mapper._get_attr_by_column(instance, c)
+ for (c, bind) in param_names:
+ params[bind] = mapper._get_attr_by_column(instance, c)
return (statement, params)
def new_execute(instance, row, isnew, **flags):
@@ -297,12 +297,11 @@ class LazyLoader(AbstractRelationLoader):
(criterion, lazybinds, rev) = LazyLoader._create_lazy_clause(self.parent_property, reverse_direction=reverse_direction)
bind_to_col = dict([(lazybinds[col].key, col) for col in lazybinds])
- class Visitor(visitors.ClauseVisitor):
- def visit_bindparam(s, bindparam):
- mapper = reverse_direction and self.parent_property.mapper or self.parent_property.parent
- if bindparam.key in bind_to_col:
- bindparam.value = mapper._get_attr_by_column(instance, bind_to_col[bindparam.key])
- return Visitor().traverse(criterion, clone=True)
+ def visit_bindparam(bindparam):
+ mapper = reverse_direction and self.parent_property.mapper or self.parent_property.parent
+ if bindparam.key in bind_to_col:
+ bindparam.value = mapper._get_attr_by_column(instance, bind_to_col[bindparam.key])
+ return visitors.traverse(criterion, clone=True, visit_bindparam=visit_bindparam)
def setup_loader(self, instance, options=None, path=None):
if not mapper.has_mapper(instance):
@@ -416,7 +415,7 @@ class LazyLoader(AbstractRelationLoader):
if should_bind(leftcol, rightcol):
col = leftcol
binary.left = binds.setdefault(leftcol,
- sql.bindparam(None, None, type_=binary.right.type, unique=True))
+ sql.bindparam(None, None, type_=binary.right.type))
reverse[rightcol] = binds[col]
# the "left is not right" compare is to handle part of a join clause that is "table.c.col1==table.c.col1",
@@ -424,7 +423,7 @@ class LazyLoader(AbstractRelationLoader):
if leftcol is not rightcol and should_bind(rightcol, leftcol):
col = rightcol
binary.right = binds.setdefault(rightcol,
- sql.bindparam(None, None, type_=binary.left.type, unique=True))
+ sql.bindparam(None, None, type_=binary.left.type))
reverse[leftcol] = binds[col]
lazywhere = primaryjoin
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index 749ce4c10..aec75e76c 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -196,7 +196,7 @@ class DefaultCompiler(engine.Compiled):
if params:
pd = {}
for bindparam, name in self.bind_names.iteritems():
- for paramname in (bindparam.key, bindparam.shortname, name):
+ for paramname in (bindparam, bindparam.key, bindparam.shortname, name):
if paramname in params:
pd[name] = params[paramname]
break
@@ -373,26 +373,13 @@ class DefaultCompiler(engine.Compiled):
return self.operators.get(operator, str(operator))
def visit_bindparam(self, bindparam, **kwargs):
- # TODO: remove this whole "unique" thing, just use regular
- # anonymous params to implement. params used for inserts/updates
- # etc. should no longer be "unique".
- if bindparam.unique:
- count = 1
- key = bindparam.key
- # redefine the generated name of the bind param in the case
- # that we have multiple conflicting bind parameters.
- while self.binds.setdefault(key, bindparam) is not bindparam:
- tag = "_%d" % count
- key = bindparam.key + tag
- count += 1
- bindparam.key = key
- return self.bindparam_string(self._truncate_bindparam(bindparam))
- else:
- existing = self.binds.get(bindparam.key)
- if existing is not None and existing.unique:
+ name = self._truncate_bindparam(bindparam)
+ if name in self.binds:
+ existing = self.binds[name]
+ if existing.unique or bindparam.unique:
raise exceptions.CompileError("Bind parameter '%s' conflicts with unique bind parameter of the same name" % bindparam.key)
- self.binds[bindparam.key] = bindparam
- return self.bindparam_string(self._truncate_bindparam(bindparam))
+ self.binds[bindparam.key] = self.binds[name] = bindparam
+ return self.bindparam_string(name)
def _truncate_bindparam(self, bindparam):
if bindparam in self.bind_names:
@@ -632,7 +619,7 @@ class DefaultCompiler(engine.Compiled):
"""
def create_bind_param(col, value):
- bindparam = sql.bindparam(col.key, value, type_=col.type, unique=True)
+ bindparam = sql.bindparam(col.key, value, type_=col.type)
self.binds[col.key] = bindparam
return self.bindparam_string(self._truncate_bindparam(bindparam))
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index 6ad29218f..732d4fdf9 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -922,7 +922,7 @@ class ClauseElement(object):
if bind.key in kwargs:
bind.value = kwargs[bind.key]
if unique:
- bind.unique=True
+ bind._convert_to_unique()
return Vis().traverse(self, clone=True)
def compare(self, other):
@@ -1749,10 +1749,14 @@ class _BindParamClause(ClauseElement, _CompareMixin):
if True, the parameter should be treated like a stored procedure "OUT"
parameter.
"""
-
- self.key = key or "{ANON %d param}" % id(self)
- self.value = value
+
+ if unique:
+ self.key = "{ANON %d %s}" % (id(self), key or 'param')
+ else:
+ self.key = key or "{ANON %d param}" % id(self)
+ self._orig_key = key
self.unique = unique
+ self.value = value
self.isoutparam = isoutparam
self.shortname = shortname
@@ -1778,6 +1782,17 @@ class _BindParamClause(ClauseElement, _CompareMixin):
type(None):sqltypes.NullType
}
+ def _clone(self):
+ c = ClauseElement._clone(self)
+ if self.unique:
+ c.key = "{ANON %d %s}" % (id(c), c._orig_key or 'param')
+ return c
+
+ def _convert_to_unique(self):
+ if not self.unique:
+ self.unique=True
+ self.key = "{ANON %d %s}" % (id(self), self._orig_key or 'param')
+
def _get_from_objects(self, **modifiers):
return []
diff --git a/test/dialect/firebird.py b/test/dialect/firebird.py
index 0abbdf029..636ab2d05 100644
--- a/test/dialect/firebird.py
+++ b/test/dialect/firebird.py
@@ -21,7 +21,7 @@ class CompileTest(SQLCompileTest):
self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1")
def test_function(self):
- self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)")
+ self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo")
diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py
index 3b40ed354..1a8504701 100755
--- a/test/dialect/mssql.py
+++ b/test/dialect/mssql.py
@@ -16,7 +16,7 @@ class CompileTest(SQLCompileTest):
def test_update(self):
t = table('sometable', column('somecolumn'))
- self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn", dict(somecolumn=10))
+ self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn_1", dict(somecolumn=10))
def test_count(self):
t = table('sometable', column('somecolumn'))
@@ -40,12 +40,12 @@ class CompileTest(SQLCompileTest):
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
)
u = union(s1, s2, order_by=['col3', 'col4'])
- self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1) ORDER BY col3, col4")
+ self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_3, :t2_col2_4) ORDER BY col3, col4")
- self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1)) AS bar")
+ self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_3, :t2_col2_4)) AS bar")
def test_function(self):
- self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)")
+ self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo()")
diff --git a/test/dialect/oracle.py b/test/dialect/oracle.py
index fa65837f0..79d73fb89 100644
--- a/test/dialect/oracle.py
+++ b/test/dialect/oracle.py
@@ -95,8 +95,8 @@ class CompileTest(SQLCompileTest):
self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
-(mytable.name = :mytable_name OR mytable.myid = :mytable_myid OR \
-myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo where boo = lar))",
+(mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_2 OR \
+myothertable.othername != :myothertable_othername_3 OR EXISTS (select yay from foo where boo = lar))",
dialect=oracle.OracleDialect(use_ansi = False))
query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid)
@@ -125,7 +125,7 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo
order_by(addresses.oid_column, address_types.oid_column)
self.assert_compile(s, "SELECT address_types_1.id, address_types_1.name, addresses.id, addresses.user_id, "
"addresses.address_type_id, addresses.email_address FROM addresses LEFT OUTER JOIN address_types address_types_1 "
- "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id ORDER BY addresses.rowid, "
+ "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id_2 ORDER BY addresses.rowid, "
"address_types.rowid")
class TypesTest(SQLCompileTest):
diff --git a/test/orm/eager_relations.py b/test/orm/eager_relations.py
index acb5e0da7..6b2baad95 100644
--- a/test/orm/eager_relations.py
+++ b/test/orm/eager_relations.py
@@ -700,11 +700,11 @@ class SelfReferentialEagerTest(ORMTest):
if testing.against('sqlite'):
self.assert_sql(testbase.db, go, [
(
- "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes WHERE nodes.data = :nodes_data ORDER BY nodes.oid LIMIT 1 OFFSET 0",
- {'nodes_data': 'n1'}
+ "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes WHERE nodes.data = :nodes_data_1 ORDER BY nodes.oid LIMIT 1 OFFSET 0",
+ {'nodes_data_1': 'n1'}
),
])
-
+
@testing.fails_on('maxdb')
def test_no_depth(self):
class Node(Base):
diff --git a/test/orm/query.py b/test/orm/query.py
index 4e5f04855..d4a69d249 100644
--- a/test/orm/query.py
+++ b/test/orm/query.py
@@ -208,9 +208,9 @@ class OperatorTest(QueryTest):
(operator.sub, '-'), (operator.div, '/'),
):
for (lhs, rhs, res) in (
- (5, User.id, ':users_id %s users.id'),
+ (5, User.id, ':users_id_1 %s users.id'),
(5, literal(6), ':param_1 %s :param_2'),
- (User.id, 5, 'users.id %s :users_id'),
+ (User.id, 5, 'users.id %s :users_id_1'),
(User.id, literal('b'), 'users.id %s :param_1'),
(User.id, User.id, 'users.id %s users.id'),
(literal(5), 'b', ':param_1 %s :param_2'),
@@ -228,9 +228,9 @@ class OperatorTest(QueryTest):
(operator.le, '<=', '>='),
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
- ('a', User.id, ':users_id', 'users.id'),
+ ('a', User.id, ':users_id_1', 'users.id'),
('a', literal('b'), ':param_2', ':param_1'), # note swap!
- (User.id, 'b', 'users.id', ':users_id'),
+ (User.id, 'b', 'users.id', ':users_id_1'),
(User.id, literal('b'), 'users.id', ':param_1'),
(User.id, User.id, 'users.id', 'users.id'),
(literal('a'), 'b', ':param_1', ':param_2'),
@@ -249,15 +249,15 @@ class OperatorTest(QueryTest):
fwd_sql + "'\n or\n'" + rev_sql + "'")
def test_op(self):
- assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name"
+ assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1"
def test_in(self):
self._test(User.id.in_(['a', 'b']),
- "users.id IN (:users_id, :users_id_1)")
+ "users.id IN (:users_id_1, :users_id_2)")
def test_between(self):
self._test(User.id.between('a', 'b'),
- "users.id BETWEEN :users_id AND :users_id_1")
+ "users.id BETWEEN :users_id_1 AND :users_id_2")
def test_clauses(self):
for (expr, compare) in (
diff --git a/test/sql/generative.py b/test/sql/generative.py
index 35a1cc2b1..b404b0017 100644
--- a/test/sql/generative.py
+++ b/test/sql/generative.py
@@ -218,13 +218,33 @@ class ClauseTest(SQLCompileTest):
def visit_binary(self, binary):
if binary.left is t1.c.col3:
binary.left = t1.c.col1
- binary.right = bindparam("table1_col1")
+ binary.right = bindparam("table1_col1", unique=True)
s5 = Vis().traverse(s4, clone=True)
print str(s4)
print str(s5)
assert str(s5) == s5_assert
assert str(s4) == s4_assert
-
+
+ def test_binds(self):
+ """test that unique bindparams change their name upon clone() to prevent conflicts"""
+
+ s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias()
+ s2 = ClauseVisitor().traverse(s, clone=True).alias()
+ s3 = select([s], s.c.col2==s2.c.col2)
+
+ self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\
+ "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_1, "\
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_3) AS anon_4 "\
+ "WHERE anon_1.col2 = anon_4.col2")
+
+ s = select([t1], t1.c.col1==4).alias()
+ s2 = ClauseVisitor().traverse(s, clone=True).alias()
+ s3 = select([s], s.c.col2==s2.c.col2)
+ self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\
+ "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_1, "\
+ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_3) AS anon_4 "\
+ "WHERE anon_1.col2 = anon_4.col2")
+
def test_alias(self):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
@@ -247,7 +267,7 @@ class ClauseTest(SQLCompileTest):
def visit_select(self, select):
select.append_whereclause(t1.c.col2==7)
- self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2")
+ self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1")
def test_clause_adapter(self):
@@ -358,13 +378,18 @@ class SelectTest(SQLCompileTest):
)
def test_select(self):
- self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1 ORDER BY table1.col3")
+ self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3),
+ "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3")
- self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3")
+ self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3),
+ "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
+ "FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3")
s = select([t2], t2.c.col1==t1.c.col1, correlate=False)
s = s.correlate(t1).order_by(t2.c.col3)
- self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3")
+ self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3),
+ "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\
+ "FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3")
def test_columns(self):
s = t1.select()
diff --git a/test/sql/query.py b/test/sql/query.py
index 9b35cff1c..6233de743 100644
--- a/test/sql/query.py
+++ b/test/sql/query.py
@@ -174,6 +174,25 @@ class QueryTest(PersistTest):
r = s.execute(userid='fred').fetchall()
assert len(r) == 1
+ def test_unique_conflict(self):
+ u = bindparam('userid', unique=True)
+ s = users.select(or_(users.c.user_name==u, users.c.user_name==u))
+ try:
+ str(s)
+ assert False
+ except exceptions.CompileError, e:
+ assert str(e) == "Bind parameter '{ANON %d userid}' conflicts with unique bind parameter of the same name" % id(u)
+
+ def test_bindparams_in_params(self):
+ """test that a _BindParamClause itself can be a key in the params dict"""
+
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+
+ u = bindparam('userid')
+ r = users.select(users.c.user_name==u).execute({u:'fred'}).fetchall()
+ assert len(r) == 1
+
def test_bindparam_shortname(self):
"""test the 'shortname' field on BindParamClause."""
users.insert().execute(user_id = 7, user_name = 'jack')
diff --git a/test/sql/select.py b/test/sql/select.py
index 58c4ea3dd..d36703af4 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -84,7 +84,8 @@ myothertable.othername FROM mytable, myothertable")
s.c.myid == 7
)
,
- "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid")
+ "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable "\
+ "WHERE mytable.name = :mytable_name_1) WHERE myid = :myid_2")
sq = select([table1])
self.assert_compile(
@@ -99,7 +100,7 @@ myothertable.othername FROM mytable, myothertable")
self.assert_compile(
sq.select(sq.c.myid == 7),
"SELECT sq.myid, sq.name, sq.description FROM \
-(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid"
+(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid_1"
)
sq = select(
@@ -111,7 +112,7 @@ myothertable.othername FROM mytable, myothertable")
sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \
mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \
myothertable.othername AS myothertable_othername FROM mytable, myothertable \
-WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid"
+WHERE mytable.myid = :mytable_myid_1 AND myothertable.otherid = mytable.myid"
self.assert_compile(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \
sq.myothertable_othername FROM (" + sqstring + ") AS sq")
@@ -148,7 +149,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""")
def testexistsascolumnclause(self):
- self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5})
+ self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)", params={'mytable_myid':5})
self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
@@ -183,7 +184,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
self.assert_compile(
table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name)"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name_1)"
)
self.assert_compile(
@@ -279,8 +280,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
)
self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE "
- "zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist "
- "FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm")
+ "zips.zipcode = :zips_zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_2)) AS dist "
+ "FROM places, zips WHERE zips.zipcode = :zips_zipcode_3 ORDER BY dist, places.nm")
zalias = zips.alias('main_zip')
qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True)
@@ -303,7 +304,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def testand(self):
self.assert_compile(
select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")),
- "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()"
+ "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_2 "\
+ "AND myothertable.othername = :myothertable_othername_3 AND sysdate() = today()"
)
def testor(self):
@@ -313,8 +315,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9),
"sysdate() = today()",
)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()",
- checkparams = {'myothertable_othername': 'asdf', 'myothertable_othername_1':'foo', 'myothertable_otherid': 9, 'mytable_myid': 12}
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid_1 AND (myothertable.othername = :myothertable_othername_2 OR myothertable.othername = :myothertable_othername_3 OR myothertable.otherid = :myothertable_otherid_4) AND sysdate() = today()",
+ checkparams = {'myothertable_othername_2': 'asdf', 'myothertable_othername_3':'foo', 'myothertable_otherid_4': 9, 'mytable_myid_1': 12}
)
def testdistinct(self):
@@ -344,9 +346,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
(operator.sub, '-'), (operator.div, '/'),
):
for (lhs, rhs, res) in (
- (5, table1.c.myid, ':mytable_myid %s mytable.myid'),
+ (5, table1.c.myid, ':mytable_myid_1 %s mytable.myid'),
(5, literal(5), ':param_1 %s :param_2'),
- (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'),
+ (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid_1'),
(table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
(literal(5), 8, ':param_1 %s :param_2'),
@@ -363,9 +365,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
(operator.le, '<=', '>='),
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
- ('a', table1.c.myid, ':mytable_myid', 'mytable.myid'),
+ ('a', table1.c.myid, ':mytable_myid_1', 'mytable.myid'),
('a', literal('b'), ':param_2', ':param_1'), # note swap!
- (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid'),
+ (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid_1'),
(table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
(literal('a'), 'b', ':param_1', ':param_2'),
@@ -385,22 +387,24 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND mytable.name != :mytable_name_2"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name BETWEEN :mytable_name AND :mytable_name_1)"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\
+ "NOT (mytable.name BETWEEN :mytable_name_2 AND :mytable_name_3)"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\
+ "NOT (mytable.name = :mytable_name_2 AND mytable.name = :mytable_name_3 AND mytable.name = :mytable_name_4)"
)
self.assert_compile(
table1.select((table1.c.myid != 12) & ~table1.c.name),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND NOT mytable.name"
)
self.assert_compile(
@@ -410,7 +414,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
# test the op() function, also that its results are further usable in expressions
self.assert_compile(
table1.select(table1.c.myid.op('hoho')(12)==14),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :param_1"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid_1) = :param_2"
)
# test that clauses can be pickled (operators need to be module-level, etc.)
@@ -420,27 +424,27 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def testextracomparisonoperators(self):
self.assert_compile(
table1.select(table1.c.name.contains('jo')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name",
- checkparams = {'mytable_name': u'%jo%'},
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name_1",
+ checkparams = {'mytable_name_1': u'%jo%'},
)
self.assert_compile(
table1.select(table1.c.name.endswith('hn')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name",
- checkparams = {'mytable_name': u'%hn'},
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name_1",
+ checkparams = {'mytable_name_1': u'%hn'},
)
def testunicodestartswith(self):
string = u"hi \xf6 \xf5"
self.assert_compile(
table1.select(table1.c.name.startswith(string)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name",
- checkparams = {'mytable_name': u'hi \xf6 \xf5%'},
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name_1",
+ checkparams = {'mytable_name_1': u'hi \xf6 \xf5%'},
)
def testmultiparam(self):
self.assert_compile(
select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')),
- "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2"
+ "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2 OR mytable.myid = :mytable_myid_3"
)
def testorderby(self):
@@ -468,17 +472,17 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
)
def testforupdate(self):
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE")
+ self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE")
+ self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE NOWAIT", dialect=oracle.dialect())
+ self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect())
self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect())
self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE", dialect=oracle.dialect())
+ self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE", dialect=oracle.dialect())
def testalias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
@@ -512,7 +516,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
-WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
+WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid_1"
)
@@ -670,7 +674,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
"""tests the generation of functions using the func keyword"""
# test an expression with a function
self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
- "lala(:lala, :lala_1, :param_1, mytable.myid) * myothertable.otherid")
+ "lala(:lala_1, :lala_2, :param_3, mytable.myid) * myothertable.otherid")
# test it in a SELECT
self.assert_compile(select([func.count(table1.c.myid)]),
@@ -683,16 +687,16 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
# test the bind parameter name with a "dotted" function name is only the name
# (limits the length of the bind param name)
self.assert_compile(select([func.foo.bar.lala(12)]),
- "SELECT foo.bar.lala(:lala)")
+ "SELECT foo.bar.lala(:lala_1)")
# test a dotted func off the engine itself
- self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho)")
+ self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho_1)")
# test None becomes NULL
- self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func, :my_func_1, NULL, :my_func_2)")
+ self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)")
# test pickling
- self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func, :my_func_1, NULL, :my_func_2)")
+ self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)")
# assert func raises AttributeError for __bases__ attribute, since its not a class
# fixes pydoc
@@ -723,16 +727,16 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
self.assert_compile(s,
"SELECT users.id, users.name, users.fullname "
"FROM users, (SELECT q, z, r "
- "FROM calculate(:x, :y)) AS c1, (SELECT q, z, r "
- "FROM calculate(:x_1, :y_1)) AS c2 "
+ "FROM calculate(:x_1, :y_2)) AS c1, (SELECT q, z, r "
+ "FROM calculate(:x_3, :y_4)) AS c2 "
"WHERE users.id BETWEEN c1.z AND c2.z"
- , checkparams={'y': 45, 'x': 17, 'y_1': 12, 'x_1': 5})
+ , checkparams={'y_2': 45, 'x_1': 17, 'y_4': 12, 'x_3': 5})
def testextract(self):
"""test the EXTRACT function"""
self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable")
- self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date, :to_date_1))")
+ self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date_1, :to_date_2))")
def testjoin(self):
self.assert_compile(
@@ -785,9 +789,9 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl
)
self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \
-FROM mytable WHERE mytable.myid = :mytable_myid UNION \
+FROM mytable WHERE mytable.myid = :mytable_myid_1 UNION \
SELECT mytable.myid, mytable.name, mytable.description \
-FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid")
+FROM mytable WHERE mytable.myid = :mytable_myid_2 ORDER BY mytable.myid")
self.assert_compile(
union(
@@ -828,8 +832,8 @@ FROM myothertable ORDER BY myid \
)
,
"SELECT mytable.myid, mytable.name, max(mytable.description) FROM mytable \
-WHERE mytable.name = :mytable_name GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \
-FROM mytable WHERE mytable.name = :mytable_name_1"
+WHERE mytable.name = :mytable_name_1 GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \
+FROM mytable WHERE mytable.name = :mytable_name_2"
)
def test_compound_select_grouping(self):
@@ -873,8 +877,8 @@ UNION SELECT mytable.myid FROM mytable"
self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \
-WHERE mytable.name = %(mytable_name)s OR mytable.myid = %(mytable_myid)s OR \
-myothertable.othername != %(myothertable_othername)s OR \
+WHERE mytable.name = %(mytable_name_1)s OR mytable.myid = %(mytable_myid_2)s OR \
+myothertable.othername != %(myothertable_othername_3)s OR \
EXISTS (select yay from foo where boo = lar)",
dialect=postgres.dialect()
)
@@ -918,10 +922,10 @@ EXISTS (select yay from foo where boo = lar)",
),
(
select([table1], or_(table1.c.myid==bindparam('myid', unique=True), table2.c.otherid==bindparam('myid', unique=True))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid_1",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
- {'myid':None, 'myid_1':None}, [None, None],
- {'myid':5, 'myid_1': 6}, {'myid':5, 'myid_1':6}, [5,6]
+ {'myid_1':None, 'myid_2':None}, [None, None],
+ {'myid_1':5, 'myid_2': 6}, {'myid_1':5, 'myid_2':6}, [5,6]
),
(
bindparam('test', type_=String) + text("'hi'"),
@@ -939,10 +943,10 @@ EXISTS (select yay from foo where boo = lar)",
),
(
select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True), table2.c.otherid==bindparam('myid', value=8, unique=True))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid_1",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?",
- {'myid':7, 'myid_1':8}, [7,8],
- {'myid':5, 'myid_1':6}, {'myid':5, 'myid_1':6}, [5,6]
+ {'myid_1':7, 'myid_2':8}, [7,8],
+ {'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6]
),
]:
@@ -952,7 +956,7 @@ EXISTS (select yay from foo where boo = lar)",
positional = stmt.compile(dialect=sqlite.dialect())
pp = positional.get_params()
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
- assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict).get_raw_dict()))
+ assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
pp = positional.get_params(**test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
@@ -966,12 +970,12 @@ EXISTS (select yay from foo where boo = lar)",
# check that conflicts with "unique" params are caught
- s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid')))
+ s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid_1')))
try:
- str(s)
+ print str(s)
assert False
except exceptions.CompileError, err:
- assert str(err) == "Bind parameter 'mytable_myid' conflicts with unique bind parameter of the same name"
+ assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name"
s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('mytable_myid_1')))
try:
@@ -990,40 +994,40 @@ EXISTS (select yay from foo where boo = lar)",
def testin(self):
self.assert_compile(select([table1], table1.c.myid.in_(['a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
self.assert_compile(select([table1], ~table1.c.myid.in_(['a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:mytable_myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_(iter(['a', 'b']))),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid_3)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)")
self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1 + :param_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :param_2 + :param_3)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)")
@@ -1032,19 +1036,19 @@ EXISTS (select yay from foo where boo = lar)",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid + mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid_2 + mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1, :mytable_myid_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2, :mytable_myid_3)")
self.assert_compile(select([table1], table1.c.myid.in_(select([table2.c.otherid]))),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
@@ -1059,8 +1063,8 @@ EXISTS (select yay from foo where boo = lar)",
)
)), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable \
WHERE mytable.myid IN (\
-SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid \
-UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1)")
+SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 \
+UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_2)")
# test that putting a select in an IN clause does not blow away its ORDER BY clause
self.assert_compile(
@@ -1079,13 +1083,13 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
def test_in_deprecated_api(self):
self.assert_compile(select([table1], table1.c.myid.in_('abc')),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_(1)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_(1,2)),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)")
self.assert_compile(select([table1], table1.c.myid.in_()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
@@ -1130,31 +1134,33 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
import datetime
table = Table('dt', metadata,
Column('date', Date))
- self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)})
+ self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
+ "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date_1 AND :dt_date_2", checkparams={'dt_date_1':datetime.date(2006,6,1), 'dt_date_2':datetime.date(2006,6,5)})
- self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
+ self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
+ "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
table = Table('op', metadata,
Column('field', Integer))
self.assert_compile(table.select((table.c.field == 5) == None),
- "SELECT op.field FROM op WHERE (op.field = :op_field) IS NULL")
+ "SELECT op.field FROM op WHERE (op.field = :op_field_1) IS NULL")
self.assert_compile(table.select((table.c.field + 5) == table.c.field),
- "SELECT op.field FROM op WHERE op.field + :op_field = op.field")
+ "SELECT op.field FROM op WHERE op.field + :op_field_1 = op.field")
self.assert_compile(table.select((table.c.field + 5) * 6),
- "SELECT op.field FROM op WHERE (op.field + :op_field) * :param_1")
+ "SELECT op.field FROM op WHERE (op.field + :op_field_1) * :param_2")
self.assert_compile(table.select((table.c.field * 5) + 6),
- "SELECT op.field FROM op WHERE op.field * :op_field + :param_1")
+ "SELECT op.field FROM op WHERE op.field * :op_field_1 + :param_2")
self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
- "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field, :op_field_1))")
+ "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field_2, :op_field_3))")
self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
- "SELECT op.field FROM op WHERE :op_field + op.field IN (:param_1, :param_2)")
+ "SELECT op.field FROM op WHERE :op_field_1 + op.field IN (:param_2, :param_3)")
self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
- "SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)")
+ "SELECT op.field FROM op WHERE NOT (op.field = :op_field_1 AND op.field = :op_field_2)")
self.assert_compile(table.select(not_(table.c.field == 5)),
- "SELECT op.field FROM op WHERE op.field != :op_field")
+ "SELECT op.field FROM op WHERE op.field != :op_field_1")
self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
- "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field AND :op_field_1)")
+ "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field_1 AND :op_field_2)")
self.assert_compile(table.select(not_(table.c.field) == 5),
"SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
@@ -1204,16 +1210,16 @@ class CRUDTest(SQLCompileTest):
self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())")
def testupdate(self):
- self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'})
- self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'})
+ self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {table1.c.name:'fred'})
+ self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {'name':'fred'})
self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid")
self.assert_compile(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}, checkparams={'crit':'notthere', 'name':'hi'})
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}, checkparams={'description':'test', 'mytable_myid':12})
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'mytable_myid': 12, 'myid': 9, 'description': 'test'})
- self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :mytable_myid", params={'myid':18}, checkparams={'myid':18, 'mytable_myid':12})
+ self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid_1", params = {'description':'test'}, checkparams={'description':'test', 'mytable_myid_1':12})
+ self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid_1", params = {'mytable_myid_1': 12, 'myid': 9, 'description': 'test'})
+ self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :mytable_myid_1", params={'myid':18}, checkparams={'myid':18, 'mytable_myid_1':12})
s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
c = s.compile(column_keys=['mytable_id', 'name'])
- self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name), description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'})
+ self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name_1), description=:description WHERE mytable.myid = :mytable_myid_2", params = {'description':'test'})
self.assert_(str(s) == str(c))
def testupdateexpression(self):
@@ -1223,8 +1229,8 @@ class CRUDTest(SQLCompileTest):
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name) "
- "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :param_2 || mytable.name || :param_3")
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name_2) "
+ "WHERE mytable.myid = hoho(:hoho_3) AND mytable.name = :param_4 || mytable.name || :param_5")
def testcorrelatedupdate(self):
# test against a straight text subquery
@@ -1238,26 +1244,29 @@ class CRUDTest(SQLCompileTest):
# test against a regular constructed subquery
s = select([table2], table2.c.otherid == table1.c.myid)
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
- self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name")
+ self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name_1")
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = update(table1, table1.c.name==s)
- self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)")
+ self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\
+ "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
u = table1.update(table1.c.name==s)
- self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
+ self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\
+ "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
def testdelete(self):
- self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid")
+ self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid_1")
def testcorrelateddelete(self):
# test a non-correlated WHERE clause
s = select([table2.c.othername], table2.c.otherid == 7)
u = delete(table1, table1.c.name==s)
- self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)")
+ self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\
+ "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid_1)")
# test one that is actually correlated...
s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
@@ -1275,7 +1284,7 @@ class InlineDefaultTest(SQLCompileTest):
Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])),
)
- self.assert_compile(t.insert(inline=True, values={}), "INSERT INTO test (col1, col2) VALUES (foo(:foo), (SELECT coalesce(max(foo.id)) FROM foo))")
+ self.assert_compile(t.insert(inline=True, values={}), "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), (SELECT coalesce(max(foo.id)) FROM foo))")
def test_update(self):
m = MetaData()
@@ -1288,27 +1297,34 @@ class InlineDefaultTest(SQLCompileTest):
Column('col3', String(30))
)
- self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo), col2=(SELECT coalesce(max(foo.id)) FROM foo), col3=:col3")
+ self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) FROM foo), col3=:col3")
class SchemaTest(SQLCompileTest):
def testselect(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
- self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value")
+ self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
+ "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\
+ "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_2")
s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'))
s.use_labels = True
- self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value AS remotetable_value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value")
+ self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value "\
+ "AS remotetable_value FROM remote_owner.remotetable WHERE "\
+ "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_2")
def testalias(self):
a = alias(table4, 'remtable')
- self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id")
+ self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable "\
+ "WHERE remtable.datatype_id = :remtable_datatype_id_1")
def testupdate(self):
- self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value")
+ self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\
+ "WHERE remotetable.value = :remotetable_value_1")
def testinsert(self):
- self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)")
+ self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\
+ "(:rem_id, :datatype_id, :value)")
if __name__ == "__main__":
testbase.main()
diff --git a/test/sql/selectable.py b/test/sql/selectable.py
index a862b0bdf..49a61bf2b 100755
--- a/test/sql/selectable.py
+++ b/test/sql/selectable.py
@@ -264,7 +264,7 @@ class PrimaryKeyTest(AssertMixin):
b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))
j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
- assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j)
+ assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x_1", str(j)
assert list(j.primary_key) == [a.c.id, b.c.x]
class DerivedTest(AssertMixin):