summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-08-17 17:24:27 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-09-16 12:31:05 -0400
commit7e864fc7b1b950760cbf02e6dcd5aa5aac267400 (patch)
tree2382b016c3eb82ae463719cc948f45ccd2a226a2 /test/sql
parent8ebc8184392e20727748cd1405245e527f17e111 (diff)
downloadsqlalchemy-7e864fc7b1b950760cbf02e6dcd5aa5aac267400.tar.gz
Create a framework to allow all SQLALCHEMY_WARN_20 to pass
As the test suite has widespread use of many patterns that are deprecated, enable SQLALCHEMY_WARN_20 globally for the test suite but then break the warnings filter out into a whole list of all the individual warnings we are looking for. this way individual changesets can target a specific class of warning, as many of these warnings will indivdidually affect dozens of files and potentially hundreds of lines of code. Many warnings are also resolved here as this patch started out that way. From this point forward there should be changesets that target a subset of the warnings at a time. For expediency, updates some migration 2.0 docs for ORM as well. Change-Id: I98b8defdf7c37b818b3824d02f7668e3f5f31c94
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_case_statement.py41
-rw-r--r--test/sql/test_defaults.py23
-rw-r--r--test/sql/test_delete.py6
-rw-r--r--test/sql/test_deprecations.py261
-rw-r--r--test/sql/test_external_traversal.py14
-rw-r--r--test/sql/test_functions.py33
-rw-r--r--test/sql/test_insert.py14
-rw-r--r--test/sql/test_insert_exec.py192
-rw-r--r--test/sql/test_labels.py64
-rw-r--r--test/sql/test_returning.py4
-rw-r--r--test/sql/test_update.py237
11 files changed, 550 insertions, 339 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py
index 74fe8876d..4bef1df7f 100644
--- a/test/sql/test_case_statement.py
+++ b/test/sql/test_case_statement.py
@@ -27,7 +27,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
@classmethod
def setup_class(cls):
- metadata = MetaData(testing.db)
+ metadata = MetaData()
global info_table
info_table = Table(
"infos",
@@ -36,24 +36,29 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
Column("info", String(30)),
)
- info_table.create()
-
- info_table.insert().execute(
- {"pk": 1, "info": "pk_1_data"},
- {"pk": 2, "info": "pk_2_data"},
- {"pk": 3, "info": "pk_3_data"},
- {"pk": 4, "info": "pk_4_data"},
- {"pk": 5, "info": "pk_5_data"},
- {"pk": 6, "info": "pk_6_data"},
- )
+ with testing.db.begin() as conn:
+ info_table.create(conn)
+
+ conn.execute(
+ info_table.insert(),
+ [
+ {"pk": 1, "info": "pk_1_data"},
+ {"pk": 2, "info": "pk_2_data"},
+ {"pk": 3, "info": "pk_3_data"},
+ {"pk": 4, "info": "pk_4_data"},
+ {"pk": 5, "info": "pk_5_data"},
+ {"pk": 6, "info": "pk_6_data"},
+ ],
+ )
@classmethod
def teardown_class(cls):
- info_table.drop()
+ with testing.db.begin() as conn:
+ info_table.drop(conn)
@testing.fails_on("firebird", "FIXME: unknown")
@testing.requires.subqueries
- def test_case(self):
+ def test_case(self, connection):
inner = select(
case(
(info_table.c.pk < 3, "lessthan3"),
@@ -63,7 +68,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
info_table.c.info,
).select_from(info_table)
- inner_result = inner.execute().fetchall()
+ inner_result = connection.execute(inner).all()
# Outputs:
# lessthan3 1 pk_1_data
@@ -86,7 +91,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
outer = select(inner.alias("q_inner"))
- outer_result = outer.execute().fetchall()
+ outer_result = connection.execute(outer).all()
assert outer_result == [
("lessthan3", 1, "pk_1_data"),
@@ -107,7 +112,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
info_table.c.info,
).select_from(info_table)
- else_result = w_else.execute().fetchall()
+ else_result = connection.execute(w_else).all()
eq_(
else_result,
@@ -149,7 +154,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
"CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END",
)
- def test_text_doesnt_explode(self):
+ def test_text_doesnt_explode(self, connection):
for s in [
select(
@@ -169,7 +174,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
).order_by(info_table.c.info),
]:
eq_(
- s.execute().fetchall(),
+ connection.execute(s).all(),
[("no",), ("no",), ("no",), ("yes",), ("no",), ("no",)],
)
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index f9bec7914..6cb1c3841 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -351,9 +351,9 @@ class DefaultObjectTest(fixtures.TestBase):
assert_raises_message(
sa.exc.ArgumentError,
r"SQL expression for WHERE/HAVING role expected, "
- r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
- t.select,
- [const],
+ r"got (?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)",
+ t.select().where,
+ const,
)
assert_raises_message(
sa.exc.ArgumentError,
@@ -1006,29 +1006,31 @@ class PKIncrementTest(fixtures.TablesTest):
# TODO: add coverage for increment on a secondary column in a key
@testing.fails_on("firebird", "Data type unknown")
- def _test_autoincrement(self, bind):
+ def _test_autoincrement(self, connection):
aitable = self.tables.aitable
ids = set()
- rs = bind.execute(aitable.insert(), int1=1)
+ rs = connection.execute(aitable.insert(), int1=1)
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(), str1="row 2")
+ rs = connection.execute(aitable.insert(), str1="row 2")
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(), int1=3, str1="row 3")
+ rs = connection.execute(aitable.insert(), int1=3, str1="row 3")
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(values={"int1": func.length("four")}))
+ rs = connection.execute(
+ aitable.insert().values({"int1": func.length("four")})
+ )
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
@@ -1045,7 +1047,7 @@ class PKIncrementTest(fixtures.TablesTest):
)
eq_(
- list(bind.execute(aitable.select().order_by(aitable.c.id))),
+ list(connection.execute(aitable.select().order_by(aitable.c.id))),
[
(testing.db.dialect.default_sequence_base, 1, None),
(testing.db.dialect.default_sequence_base + 1, None, "row 2"),
@@ -1055,7 +1057,8 @@ class PKIncrementTest(fixtures.TablesTest):
)
def test_autoincrement_autocommit(self):
- self._test_autoincrement(testing.db)
+ with testing.db.connect() as conn:
+ self._test_autoincrement(conn)
def test_autoincrement_transaction(self):
with testing.db.begin() as conn:
diff --git a/test/sql/test_delete.py b/test/sql/test_delete.py
index 2e3ba4245..934022560 100644
--- a/test/sql/test_delete.py
+++ b/test/sql/test_delete.py
@@ -57,7 +57,7 @@ class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- delete(table1, table1.c.myid == 7),
+ delete(table1).where(table1.c.myid == 7),
"DELETE FROM mytable WHERE mytable.myid = :myid_1",
)
@@ -118,7 +118,7 @@ class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test a non-correlated WHERE clause
s = select(table2.c.othername).where(table2.c.otherid == 7)
self.assert_compile(
- delete(table1, table1.c.name == s.scalar_subquery()),
+ delete(table1).where(table1.c.name == s.scalar_subquery()),
"DELETE FROM mytable "
"WHERE mytable.name = ("
"SELECT myothertable.othername "
@@ -133,7 +133,7 @@ class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test one that is actually correlated...
s = select(table2.c.othername).where(table2.c.otherid == table1.c.myid)
self.assert_compile(
- table1.delete(table1.c.name == s.scalar_subquery()),
+ table1.delete().where(table1.c.name == s.scalar_subquery()),
"DELETE FROM mytable "
"WHERE mytable.name = ("
"SELECT myothertable.othername "
diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py
index a72974dbb..30e4338b5 100644
--- a/test/sql/test_deprecations.py
+++ b/test/sql/test_deprecations.py
@@ -1,5 +1,8 @@
#! coding: utf-8
+import itertools
+import random
+
from sqlalchemy import alias
from sqlalchemy import and_
from sqlalchemy import bindparam
@@ -28,9 +31,11 @@ from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
from sqlalchemy.sql import coercions
+from sqlalchemy.sql import literal
from sqlalchemy.sql import operators
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import roles
+from sqlalchemy.sql import update
from sqlalchemy.sql import visitors
from sqlalchemy.sql.selectable import SelectStatementGrouping
from sqlalchemy.testing import assert_raises
@@ -46,6 +51,7 @@ from sqlalchemy.testing import mock
from sqlalchemy.testing import not_in
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+from .test_update import _UpdateFromTestBase
class ToMetaDataTest(fixtures.TestBase):
@@ -1675,7 +1681,7 @@ class DefaultTest(fixtures.TestBase):
eq_(conn.execute(select(table)).first(), (5, 12))
-class DMLTest(fixtures.TestBase, AssertsCompiledSQL):
+class DMLTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
__dialect__ = "default"
def test_insert_inline_kw_defaults(self):
@@ -1770,6 +1776,259 @@ class DMLTest(fixtures.TestBase, AssertsCompiledSQL):
stmt, "UPDATE foo SET bar=%s LIMIT 10", dialect="mysql"
)
+ def test_update_whereclause(self):
+ table1 = table(
+ "mytable", Column("myid", Integer), Column("name", String(30)),
+ )
+
+ with testing.expect_deprecated_20(
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.update(table1.c.myid == 7),
+ "UPDATE mytable SET myid=:myid, name=:name "
+ "WHERE mytable.myid = :myid_1",
+ )
+
+ def test_update_values(self):
+ table1 = table(
+ "mytable", Column("myid", Integer), Column("name", String(30)),
+ )
+
+ with testing.expect_deprecated_20(
+ "The update.values parameter will be removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.update(values={table1.c.myid: 7}),
+ "UPDATE mytable SET myid=:myid",
+ )
+
+ def test_delete_whereclause(self):
+ table1 = table("mytable", Column("myid", Integer),)
+
+ with testing.expect_deprecated_20(
+ "The delete.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.delete(table1.c.myid == 7),
+ "DELETE FROM mytable WHERE mytable.myid = :myid_1",
+ )
+
+ def test_update_ordered_parameters_fire_onupdate(self):
+ table = self.tables.update_w_default
+
+ values = [(table.c.y, table.c.x + 5), ("x", 10)]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ self.assert_compile(
+ table.update(preserve_parameter_order=True).values(values),
+ "UPDATE update_w_default "
+ "SET ycol=(update_w_default.x + :x_1), "
+ "x=:x, data=:data",
+ )
+
+ def test_update_ordered_parameters_override_onupdate(self):
+ table = self.tables.update_w_default
+
+ values = [
+ (table.c.y, table.c.x + 5),
+ (table.c.data, table.c.x + 10),
+ ("x", 10),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ self.assert_compile(
+ table.update(preserve_parameter_order=True).values(values),
+ "UPDATE update_w_default "
+ "SET ycol=(update_w_default.x + :x_1), "
+ "data=(update_w_default.x + :x_2), x=:x",
+ )
+
+ def test_update_ordered_parameters_oldstyle_1(self):
+ table1 = self.tables.mytable
+
+ # Confirm that we can pass values as list value pairs
+ # note these are ordered *differently* from table.c
+ values = [
+ (table1.c.name, table1.c.name + "lala"),
+ (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0.",
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0",
+ "The update.values parameter will be removed in SQLAlchemy 2.0",
+ ):
+ self.assert_compile(
+ update(
+ table1,
+ (table1.c.myid == func.hoho(4))
+ & (
+ table1.c.name
+ == literal("foo") + table1.c.name + literal("lala")
+ ),
+ preserve_parameter_order=True,
+ values=values,
+ ),
+ "UPDATE mytable "
+ "SET "
+ "name=(mytable.name || :name_1), "
+ "myid=do_stuff(mytable.myid, :param_1) "
+ "WHERE "
+ "mytable.myid = hoho(:hoho_1) AND "
+ "mytable.name = :param_2 || mytable.name || :param_3",
+ )
+
+ def test_update_ordered_parameters_oldstyle_2(self):
+ table1 = self.tables.mytable
+
+ # Confirm that we can pass values as list value pairs
+ # note these are ordered *differently* from table.c
+ values = [
+ (table1.c.name, table1.c.name + "lala"),
+ ("description", "some desc"),
+ (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0.",
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0",
+ ):
+ self.assert_compile(
+ update(
+ table1,
+ (table1.c.myid == func.hoho(4))
+ & (
+ table1.c.name
+ == literal("foo") + table1.c.name + literal("lala")
+ ),
+ preserve_parameter_order=True,
+ ).values(values),
+ "UPDATE mytable "
+ "SET "
+ "name=(mytable.name || :name_1), "
+ "description=:description, "
+ "myid=do_stuff(mytable.myid, :param_1) "
+ "WHERE "
+ "mytable.myid = hoho(:hoho_1) AND "
+ "mytable.name = :param_2 || mytable.name || :param_3",
+ )
+
+ def test_update_preserve_order_reqs_listtups(self):
+ table1 = self.tables.mytable
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ testing.assert_raises_message(
+ ValueError,
+ r"When preserve_parameter_order is True, values\(\) "
+ r"only accepts a list of 2-tuples",
+ table1.update(preserve_parameter_order=True).values,
+ {"description": "foo", "name": "bar"},
+ )
+
+ @testing.fixture
+ def randomized_param_order_update(self):
+ from sqlalchemy.sql.dml import UpdateDMLState
+
+ super_process_ordered_values = UpdateDMLState._process_ordered_values
+
+ # this fixture is needed for Python 3.6 and above to work around
+ # dictionaries being insert-ordered. in python 2.7 the previous
+ # logic fails pretty easily without this fixture.
+ def _process_ordered_values(self, statement):
+ super_process_ordered_values(self, statement)
+
+ tuples = list(self._dict_parameters.items())
+ random.shuffle(tuples)
+ self._dict_parameters = dict(tuples)
+
+ dialect = default.StrCompileDialect()
+ dialect.paramstyle = "qmark"
+ dialect.positional = True
+
+ with mock.patch.object(
+ UpdateDMLState, "_process_ordered_values", _process_ordered_values
+ ):
+ yield
+
+ def random_update_order_parameters():
+ from sqlalchemy import ARRAY
+
+ t = table(
+ "foo",
+ column("data1", ARRAY(Integer)),
+ column("data2", ARRAY(Integer)),
+ column("data3", ARRAY(Integer)),
+ column("data4", ARRAY(Integer)),
+ )
+
+ idx_to_value = [
+ (t.c.data1, 5, 7),
+ (t.c.data2, 10, 18),
+ (t.c.data3, 8, 4),
+ (t.c.data4, 12, 14),
+ ]
+
+ def combinations():
+ while True:
+ random.shuffle(idx_to_value)
+ yield list(idx_to_value)
+
+ return testing.combinations(
+ *[
+ (t, combination)
+ for i, combination in zip(range(10), combinations())
+ ],
+ argnames="t, idx_to_value"
+ )
+
+ @random_update_order_parameters()
+ def test_update_to_expression_ppo(
+ self, randomized_param_order_update, t, idx_to_value
+ ):
+ dialect = default.StrCompileDialect()
+ dialect.paramstyle = "qmark"
+ dialect.positional = True
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ stmt = t.update(preserve_parameter_order=True).values(
+ [(col[idx], val) for col, idx, val in idx_to_value]
+ )
+
+ self.assert_compile(
+ stmt,
+ "UPDATE foo SET %s"
+ % (
+ ", ".join(
+ "%s[?]=?" % col.key for col, idx, val in idx_to_value
+ )
+ ),
+ dialect=dialect,
+ checkpositional=tuple(
+ itertools.chain.from_iterable(
+ (idx, val) for col, idx, val in idx_to_value
+ )
+ ),
+ )
+
class TableDeprecationTest(fixtures.TestBase):
def test_mustexists(self):
diff --git a/test/sql/test_external_traversal.py b/test/sql/test_external_traversal.py
index c67b45203..970c39cef 100644
--- a/test/sql/test_external_traversal.py
+++ b/test/sql/test_external_traversal.py
@@ -625,7 +625,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
assert sql_util.ClauseAdapter(u).traverse(t1) is u
- def test_binds(self):
+ def test_bindparams(self):
"""test that unique bindparams change their name upon clone()
to prevent conflicts"""
@@ -1513,7 +1513,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
- vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)),
+ vis.traverse(case((t1.c.col1 == 5, t1.c.col2), else_=t1.c.col1)),
"CASE WHEN (t1alias.col1 = :col1_1) THEN "
"t1alias.col2 ELSE t1alias.col1 END",
)
@@ -1523,7 +1523,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
vis.traverse(
- case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1)
+ case((5, t1.c.col2), value=t1.c.col1, else_=t1.c.col1)
),
"CASE t1alias.col1 WHEN :param_1 THEN "
"t1alias.col2 ELSE t1alias.col1 END",
@@ -2152,7 +2152,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
"VALUES (:col1, :col2, :col3)",
)
- i2 = t1.insert(prefixes=["squiznart"])
+ i2 = t1.insert().prefix_with("squiznart")
self.assert_compile(
i2,
"INSERT squiznart INTO table1 (col1, col2, col3) "
@@ -2223,7 +2223,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_inline_values_single(self):
- i = t1.insert(values={"col1": 5})
+ i = t1.insert().values({"col1": 5})
compile_state = i._compile_state_factory(i, None)
@@ -2231,7 +2231,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
is_(compile_state._has_multi_parameters, False)
def test_inline_values_multi(self):
- i = t1.insert(values=[{"col1": 5}, {"col1": 6}])
+ i = t1.insert().values([{"col1": 5}, {"col1": 6}])
compile_state = i._compile_state_factory(i, None)
@@ -2354,7 +2354,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_update_no_support_multi_constructor(self):
- stmt = t1.update(values=[{"col1": 5}, {"col1": 7}])
+ stmt = t1.update().values([{"col1": 5}, {"col1": 7}])
assert_raises_message(
exc.InvalidRequestError,
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index 73954a8af..3c6140b81 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -990,30 +990,32 @@ class ExecuteTest(fixtures.TestBase):
Column("stuff", String(20), onupdate="thisisstuff"),
)
meta.create_all(connection)
- connection.execute(t.insert(values=dict(value=func.length("one"))))
+ connection.execute(t.insert().values(value=func.length("one")))
eq_(connection.execute(t.select()).first().value, 3)
- connection.execute(t.update(values=dict(value=func.length("asfda"))))
+ connection.execute(t.update().values(value=func.length("asfda")))
eq_(connection.execute(t.select()).first().value, 5)
r = connection.execute(
- t.insert(values=dict(value=func.length("sfsaafsda")))
+ t.insert().values(value=func.length("sfsaafsda"))
)
id_ = r.inserted_primary_key[0]
- eq_(connection.execute(t.select(t.c.id == id_)).first().value, 9)
- connection.execute(t.update(values={t.c.value: func.length("asdf")}))
+ eq_(
+ connection.execute(t.select().where(t.c.id == id_)).first().value,
+ 9,
+ )
+ connection.execute(t.update().values({t.c.value: func.length("asdf")}))
eq_(connection.execute(t.select()).first().value, 4)
connection.execute(t2.insert())
- connection.execute(t2.insert(values=dict(value=func.length("one"))))
+ connection.execute(t2.insert().values(value=func.length("one")))
connection.execute(
- t2.insert(values=dict(value=func.length("asfda") + -19)),
- stuff="hi",
+ t2.insert().values(value=func.length("asfda") + -19), stuff="hi",
)
res = sorted(connection.execute(select(t2.c.value, t2.c.stuff)))
eq_(res, [(-14, "hi"), (3, None), (7, None)])
connection.execute(
- t2.update(values=dict(value=func.length("asdsafasd"))),
+ t2.update().values(value=func.length("asdsafasd")),
stuff="some stuff",
)
eq_(
@@ -1023,23 +1025,18 @@ class ExecuteTest(fixtures.TestBase):
connection.execute(t2.delete())
- connection.execute(
- t2.insert(values=dict(value=func.length("one") + 8))
- )
+ connection.execute(t2.insert().values(value=func.length("one") + 8))
eq_(connection.execute(t2.select()).first().value, 11)
- connection.execute(t2.update(values=dict(value=func.length("asfda"))))
+ connection.execute(t2.update().values(value=func.length("asfda")))
eq_(
connection.execute(select(t2.c.value, t2.c.stuff)).first(),
(5, "thisisstuff"),
)
connection.execute(
- t2.update(
- values={
- t2.c.value: func.length("asfdaasdf"),
- t2.c.stuff: "foo",
- }
+ t2.update().values(
+ {t2.c.value: func.length("asfdaasdf"), t2.c.stuff: "foo"}
)
)
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index 3cdf291ec..541ffc4e9 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -164,7 +164,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams = {"myid": 3, "name": "jack"}
self.assert_compile(
- insert(table1, dict(myid=3, name="jack")),
+ insert(table1).values(myid=3, name="jack"),
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)",
checkparams=checkparams,
)
@@ -194,7 +194,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams = {"myid": 3, "name": "jack", "unknowncol": "oops"}
- stmt = insert(table1, values=checkparams)
+ stmt = insert(table1).values(checkparams)
assert_raises_message(
exc.CompileError,
"Unconsumed column names: unknowncol",
@@ -210,7 +210,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
{"myid": 4, "name": "someone", "unknowncol": "oops"},
]
- stmt = insert(table1, values=checkparams)
+ stmt = insert(table1).values(checkparams)
assert_raises_message(
exc.CompileError,
"Unconsumed column names: unknowncol",
@@ -228,7 +228,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
}
self.assert_compile(
- insert(table1, (3, "jack", "mydescription")),
+ insert(table1).values([3, "jack", "mydescription"]),
"INSERT INTO mytable (myid, name, description) "
"VALUES (:myid, :name, :description)",
checkparams=checkparams,
@@ -238,7 +238,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- insert(table1, values=dict(myid=func.lala())),
+ insert(table1).values(myid=func.lala()),
"INSERT INTO mytable (myid) VALUES (lala())",
)
@@ -251,7 +251,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
}
self.assert_compile(
- insert(table1, values),
+ insert(table1).values(values),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
)
@@ -262,7 +262,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
values2 = {table1.c.name: bindparam("username")}
self.assert_compile(
- insert(table1, values=values1).values(values2),
+ insert(table1).values(values1).values(values2),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
)
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index c4266603e..45a8bccf5 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -36,23 +36,29 @@ class InsertExecTest(fixtures.TablesTest):
)
@testing.requires.multivalues_inserts
- def test_multivalues_insert(self):
+ def test_multivalues_insert(self, connection):
users = self.tables.users
- users.insert(
- values=[
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- ]
- ).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ connection.execute(
+ users.insert().values(
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ ]
+ )
+ )
+ rows = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).all()
eq_(rows[0], (7, "jack"))
eq_(rows[1], (8, "ed"))
- users.insert(values=[(9, "jack"), (10, "ed")]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ connection.execute(users.insert().values([(9, "jack"), (10, "ed")]))
+ rows = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).all()
eq_(rows[2], (9, "jack"))
eq_(rows[3], (10, "ed"))
- def test_insert_heterogeneous_params(self):
+ def test_insert_heterogeneous_params(self, connection):
"""test that executemany parameters are asserted to match the
parameter set of the first."""
users = self.tables.users
@@ -63,16 +69,22 @@ class InsertExecTest(fixtures.TablesTest):
"bind parameter 'user_name', in "
"parameter group 2\n"
r"\[SQL: u?INSERT INTO users",
- users.insert().execute,
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9},
+ connection.execute,
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
+ ],
)
# this succeeds however. We aren't yet doing
# a length check on all subsequent parameters.
- users.insert().execute(
- {"user_id": 7}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9}
+ connection.execute(
+ users.insert(),
+ {"user_id": 7},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
)
def _test_lastrow_accessor(self, table_, values, assertvalues):
@@ -94,26 +106,29 @@ class InsertExecTest(fixtures.TablesTest):
):
is_(bool(comp.returning), True)
- result = engine.execute(table_.insert(), **values)
- ret = values.copy()
-
- for col, id_ in zip(
- table_.primary_key, result.inserted_primary_key
- ):
- ret[col.key] = id_
-
- if result.lastrow_has_defaults():
- criterion = and_(
- *[
- col == id_
- for col, id_ in zip(
- table_.primary_key, result.inserted_primary_key
- )
- ]
- )
- row = engine.execute(table_.select(criterion)).first()
- for c in table_.c:
- ret[c.key] = row._mapping[c]
+ with engine.begin() as connection:
+ result = connection.execute(table_.insert(), **values)
+ ret = values.copy()
+
+ for col, id_ in zip(
+ table_.primary_key, result.inserted_primary_key
+ ):
+ ret[col.key] = id_
+
+ if result.lastrow_has_defaults():
+ criterion = and_(
+ *[
+ col == id_
+ for col, id_ in zip(
+ table_.primary_key, result.inserted_primary_key
+ )
+ ]
+ )
+ row = connection.execute(
+ table_.select().where(criterion)
+ ).first()
+ for c in table_.c:
+ ret[c.key] = row._mapping[c]
return ret
if testing.against("firebird", "postgresql", "oracle", "mssql"):
@@ -275,15 +290,16 @@ class InsertExecTest(fixtures.TablesTest):
Column("x", Integer, primary_key=True),
Column("y", Integer),
)
- t.create(eng)
- r = eng.execute(t.insert().values(y=5))
- eq_(r.inserted_primary_key, (0,))
+ with eng.begin() as conn:
+ t.create(conn)
+ r = conn.execute(t.insert().values(y=5))
+ eq_(r.inserted_primary_key, (0,))
@testing.fails_on(
"sqlite", "sqlite autoincrement doesn't work with composite pks"
)
@testing.provide_metadata
- def test_misordered_lastrow(self):
+ def test_misordered_lastrow(self, connection):
metadata = self.metadata
related = Table(
@@ -312,37 +328,39 @@ class InsertExecTest(fixtures.TablesTest):
mariadb_engine="MyISAM",
)
- metadata.create_all()
- r = related.insert().values(id=12).execute()
+ metadata.create_all(connection)
+ r = connection.execute(related.insert().values(id=12))
id_ = r.inserted_primary_key[0]
eq_(id_, 12)
- r = t6.insert().values(manual_id=id_).execute()
+ r = connection.execute(t6.insert().values(manual_id=id_))
eq_(r.inserted_primary_key, (12, 1))
- def test_implicit_id_insert_select_columns(self):
+ def test_implicit_id_insert_select_columns(self, connection):
users = self.tables.users
stmt = users.insert().from_select(
(users.c.user_id, users.c.user_name),
users.select().where(users.c.user_id == 20),
)
- testing.db.execute(stmt)
+ r = connection.execute(stmt)
+ eq_(r.inserted_primary_key, (None,))
- def test_implicit_id_insert_select_keys(self):
+ def test_implicit_id_insert_select_keys(self, connection):
users = self.tables.users
stmt = users.insert().from_select(
["user_id", "user_name"],
users.select().where(users.c.user_id == 20),
)
- testing.db.execute(stmt)
+ r = connection.execute(stmt)
+ eq_(r.inserted_primary_key, (None,))
@testing.requires.empty_inserts
@testing.requires.returning
- def test_no_inserted_pk_on_returning(self):
+ def test_no_inserted_pk_on_returning(self, connection):
users = self.tables.users
- result = testing.db.execute(
+ result = connection.execute(
users.insert().returning(users.c.user_id, users.c.user_name)
)
assert_raises_message(
@@ -399,52 +417,60 @@ class TableInsertTest(fixtures.TablesTest):
return t
def _test(
- self, stmt, row, returning=None, inserted_primary_key=False, table=None
+ self,
+ connection,
+ stmt,
+ row,
+ returning=None,
+ inserted_primary_key=False,
+ table=None,
):
- with testing.db.connect() as conn:
- r = conn.execute(stmt)
+ r = connection.execute(stmt)
- if returning:
- returned = r.first()
- eq_(returned, returning)
- elif inserted_primary_key is not False:
- eq_(r.inserted_primary_key, inserted_primary_key)
+ if returning:
+ returned = r.first()
+ eq_(returned, returning)
+ elif inserted_primary_key is not False:
+ eq_(r.inserted_primary_key, inserted_primary_key)
- if table is None:
- table = self.tables.foo
+ if table is None:
+ table = self.tables.foo
- eq_(conn.execute(table.select()).first(), row)
+ eq_(connection.execute(table.select()).first(), row)
- def _test_multi(self, stmt, rows, data):
- testing.db.execute(stmt, rows)
+ def _test_multi(self, connection, stmt, rows, data):
+ connection.execute(stmt, rows)
eq_(
- testing.db.execute(
+ connection.execute(
self.tables.foo.select().order_by(self.tables.foo.c.id)
- ).fetchall(),
+ ).all(),
data,
)
@testing.requires.sequences
- def test_explicit_sequence(self):
+ def test_explicit_sequence(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(
id=func.next_value(Sequence("t_id_seq")), data="data", x=5
),
(testing.db.dialect.default_sequence_base, "data", 5),
)
- def test_uppercase(self):
+ def test_uppercase(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
)
- def test_uppercase_inline(self):
+ def test_uppercase_inline(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().inline().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
@@ -454,64 +480,71 @@ class TableInsertTest(fixtures.TablesTest):
"mssql+pyodbc",
"Pyodbc + SQL Server + Py3K, some decimal handling issue",
)
- def test_uppercase_inline_implicit(self):
+ def test_uppercase_inline_implicit(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().inline().values(data="data", x=5),
(1, "data", 5),
inserted_primary_key=(None,),
)
- def test_uppercase_implicit(self):
+ def test_uppercase_implicit(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(testing.db.dialect.default_sequence_base,),
)
- def test_uppercase_direct_params(self):
+ def test_uppercase_direct_params(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
)
@testing.requires.returning
- def test_uppercase_direct_params_returning(self):
+ def test_uppercase_direct_params_returning(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
(1, "data", 5),
returning=(1, 5),
)
@testing.requires.sql_expressions_inserted_as_primary_key
- def test_sql_expr_lastrowid(self):
+ def test_sql_expr_lastrowid(self, connection):
# see also test.orm.test_unitofwork.py
# ClauseAttributesTest.test_insert_pk_expression
t = self.tables.foo_no_seq
self._test(
+ connection,
t.insert().values(id=literal(5) + 10, data="data", x=5),
(15, "data", 5),
inserted_primary_key=(15,),
table=self.tables.foo_no_seq,
)
- def test_direct_params(self):
+ def test_direct_params(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(),
)
@testing.requires.returning
- def test_direct_params_returning(self):
+ def test_direct_params_returning(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
(testing.db.dialect.default_sequence_base, "data", 5),
returning=(testing.db.dialect.default_sequence_base, 5),
@@ -523,9 +556,10 @@ class TableInsertTest(fixtures.TablesTest):
# does not indicate the Sequence
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk(self):
+ def test_implicit_pk(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(),
@@ -533,9 +567,10 @@ class TableInsertTest(fixtures.TablesTest):
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk_multi_rows(self):
+ def test_implicit_pk_multi_rows(self, connection):
t = self._fixture()
self._test_multi(
+ connection,
t.insert(),
[
{"data": "d1", "x": 5},
@@ -547,9 +582,10 @@ class TableInsertTest(fixtures.TablesTest):
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk_inline(self):
+ def test_implicit_pk_inline(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().inline().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(),
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index 731f8fcb5..e3fdff806 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -273,15 +273,20 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
# version) generate a subquery for limits/offsets. ensure that the
# generated result map corresponds to the selected table, not the
# select query
- s = table1.select(
- use_labels=True, order_by=[table1.c.this_is_the_primarykey_column]
- ).limit(2)
+ s = (
+ table1.select()
+ .apply_labels()
+ .order_by(table1.c.this_is_the_primarykey_column)
+ .limit(2)
+ )
self._assert_labeled_table1_select(s)
def test_result_map_subquery(self):
table1 = self.table1
- s = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ s = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
s2 = select(s)
compiled = s2.compile(dialect=self._length_fixture())
@@ -302,7 +307,11 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
table1 = self.table1
dialect = self._length_fixture()
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
s = select(q).apply_labels()
self.assert_compile(
@@ -348,7 +357,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
def test_column_bind_labels_1(self):
table1 = self.table1
- s = table1.select(table1.c.this_is_the_primarykey_column == 4)
+ s = table1.select().where(table1.c.this_is_the_primarykey_column == 4)
self.assert_compile(
s,
"SELECT some_large_named_table.this_is_the_primarykey_column, "
@@ -375,7 +384,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
def test_column_bind_labels_2(self):
table1 = self.table1
- s = table1.select(
+ s = table1.select().where(
or_(
table1.c.this_is_the_primarykey_column == 4,
table1.c.this_is_the_primarykey_column == 2,
@@ -473,8 +482,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_1(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
compile_dialect = default.DefaultDialect(label_length=10)
@@ -501,8 +512,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_2(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
@@ -531,8 +544,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
table1 = self.table1
compile_dialect = default.DefaultDialect(label_length=4)
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
@@ -559,7 +574,11 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_4(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
x = select(q).apply_labels()
compile_dialect = default.DefaultDialect(label_length=10)
@@ -586,7 +605,11 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_5(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
x = select(q).apply_labels()
compile_dialect = default.DefaultDialect(label_length=4)
@@ -615,7 +638,8 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
table1 = self.table1
q = (
- table1.select(table1.c.this_is_the_primarykey_column == 4)
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
.apply_labels()
.alias("foo")
)
@@ -642,8 +666,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_result_schema_column_2(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 2fb50f37d..26d4969c8 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -370,7 +370,7 @@ class ReturnDefaultsTest(fixtures.TablesTest):
def test_arg_insert_pk(self, connection):
t1 = self.tables.t1
result = connection.execute(
- t1.insert(return_defaults=[t1.c.insdef]).values(upddef=1)
+ t1.insert().return_defaults(t1.c.insdef).values(upddef=1)
)
eq_(
[
@@ -394,7 +394,7 @@ class ReturnDefaultsTest(fixtures.TablesTest):
t1 = self.tables.t1
connection.execute(t1.insert().values(upddef=1))
result = connection.execute(
- t1.update(return_defaults=[t1.c.upddef]).values(data="d1")
+ t1.update().return_defaults(t1.c.upddef).values(data="d1")
)
eq_(
[result.returned_defaults._mapping[k] for k in (t1.c.upddef,)], [1]
diff --git a/test/sql/test_update.py b/test/sql/test_update.py
index a5e57a78a..8be5868db 100644
--- a/test/sql/test_update.py
+++ b/test/sql/test_update.py
@@ -195,13 +195,12 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
# test against a straight text subquery
- u = update(
- table1,
- values={
+ u = update(table1).values(
+ {
table1.c.name: text(
"(select name from mytable where id=mytable.id)"
)
- },
+ }
)
self.assert_compile(
u,
@@ -213,13 +212,12 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
mt = table1.alias()
- u = update(
- table1,
- values={
+ u = update(table1).values(
+ {
table1.c.name: select(mt.c.name)
.where(mt.c.myid == table1.c.myid)
.scalar_subquery()
- },
+ }
)
self.assert_compile(
u,
@@ -238,7 +236,11 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
.where(table2.c.otherid == table1.c.myid)
.scalar_subquery()
)
- u = update(table1, table1.c.name == "jack", values={table1.c.name: s})
+ u = (
+ update(table1)
+ .where(table1.c.name == "jack")
+ .values({table1.c.name: s})
+ )
self.assert_compile(
u,
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
@@ -253,7 +255,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test a non-correlated WHERE clause
s = select(table2.c.othername).where(table2.c.otherid == 7)
- u = update(table1, table1.c.name == s.scalar_subquery())
+ u = update(table1).where(table1.c.name == s.scalar_subquery())
self.assert_compile(
u,
"UPDATE mytable SET myid=:myid, name=:name, "
@@ -268,7 +270,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test one that is actually correlated...
s = select(table2.c.othername).where(table2.c.otherid == table1.c.myid)
- u = table1.update(table1.c.name == s.scalar_subquery())
+ u = table1.update().where(table1.c.name == s.scalar_subquery())
self.assert_compile(
u,
"UPDATE mytable SET myid=:myid, name=:name, "
@@ -420,7 +422,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 7),
+ update(table1).where(table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params={table1.c.name: "fred"},
)
@@ -440,7 +442,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 7),
+ update(table1).where(table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params={"name": "fred"},
)
@@ -449,7 +451,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, values={table1.c.name: table1.c.myid}),
+ update(table1).values({table1.c.name: table1.c.myid}),
"UPDATE mytable SET name=mytable.myid",
)
@@ -457,11 +459,9 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(
- table1,
- whereclause=table1.c.name == bindparam("crit"),
- values={table1.c.name: "hi"},
- ),
+ update(table1)
+ .where(table1.c.name == bindparam("crit"))
+ .values({table1.c.name: "hi"},),
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
params={"crit": "notthere"},
checkparams={"crit": "notthere", "name": "hi"},
@@ -471,11 +471,9 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(
- table1,
- table1.c.myid == 12,
- values={table1.c.name: table1.c.myid},
- ),
+ update(table1)
+ .where(table1.c.myid == 12)
+ .values({table1.c.name: table1.c.myid},),
"UPDATE mytable "
"SET name=mytable.myid, description=:description "
"WHERE mytable.myid = :myid_1",
@@ -487,7 +485,9 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
+ update(table1)
+ .where(table1.c.myid == 12)
+ .values({table1.c.myid: 9}),
"UPDATE mytable "
"SET myid=:myid, description=:description "
"WHERE mytable.myid = :myid_1",
@@ -498,7 +498,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 12),
+ update(table1).where(table1.c.myid == 12),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
params={"myid": 18},
checkparams={"myid": 18, "myid_1": 12},
@@ -507,7 +507,11 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
def test_update_9(self):
table1 = self.tables.mytable
- s = table1.update(table1.c.myid == 12, values={table1.c.name: "lala"})
+ s = (
+ table1.update()
+ .where(table1.c.myid == 12)
+ .values({table1.c.name: "lala"})
+ )
c = s.compile(column_keys=["id", "name"])
eq_(str(s), str(c))
@@ -517,7 +521,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
v1 = {table1.c.name: table1.c.myid}
v2 = {table1.c.name: table1.c.name + "foo"}
self.assert_compile(
- update(table1, table1.c.myid == 12, values=v1).values(v2),
+ update(table1).where(table1.c.myid == 12).values(v1).values(v2),
"UPDATE mytable "
"SET "
"name=(mytable.name || :name_1), "
@@ -535,15 +539,15 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
}
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
- ),
- values=values,
- ),
+ )
+ )
+ .values(values),
"UPDATE mytable "
"SET "
"myid=do_stuff(mytable.myid, :param_1), "
@@ -586,35 +590,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
column_keys=["j"],
)
- def test_update_ordered_parameters_oldstyle_1(self):
- table1 = self.tables.mytable
-
- # Confirm that we can pass values as list value pairs
- # note these are ordered *differently* from table.c
- values = [
- (table1.c.name, table1.c.name + "lala"),
- (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
- ]
- self.assert_compile(
- update(
- table1,
- (table1.c.myid == func.hoho(4))
- & (
- table1.c.name
- == literal("foo") + table1.c.name + literal("lala")
- ),
- preserve_parameter_order=True,
- values=values,
- ),
- "UPDATE mytable "
- "SET "
- "name=(mytable.name || :name_1), "
- "myid=do_stuff(mytable.myid, :param_1) "
- "WHERE "
- "mytable.myid = hoho(:hoho_1) AND "
- "mytable.name = :param_2 || mytable.name || :param_3",
- )
-
def test_update_ordered_parameters_newstyle_1(self):
table1 = self.tables.mytable
@@ -643,36 +618,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
"mytable.name = :param_2 || mytable.name || :param_3",
)
- def test_update_ordered_parameters_oldstyle_2(self):
- table1 = self.tables.mytable
-
- # Confirm that we can pass values as list value pairs
- # note these are ordered *differently* from table.c
- values = [
- (table1.c.name, table1.c.name + "lala"),
- ("description", "some desc"),
- (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
- ]
- self.assert_compile(
- update(
- table1,
- (table1.c.myid == func.hoho(4))
- & (
- table1.c.name
- == literal("foo") + table1.c.name + literal("lala")
- ),
- preserve_parameter_order=True,
- ).values(values),
- "UPDATE mytable "
- "SET "
- "name=(mytable.name || :name_1), "
- "description=:description, "
- "myid=do_stuff(mytable.myid, :param_1) "
- "WHERE "
- "mytable.myid = hoho(:hoho_1) AND "
- "mytable.name = :param_2 || mytable.name || :param_3",
- )
-
def test_update_ordered_parameters_newstyle_2(self):
table1 = self.tables.mytable
@@ -684,14 +629,15 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
(table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
]
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
),
- ).ordered_values(*values),
+ )
+ .ordered_values(*values),
"UPDATE mytable "
"SET "
"name=(mytable.name || :name_1), "
@@ -741,42 +687,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
stmt.compile,
)
- def test_update_ordered_parameters_fire_onupdate(self):
- table = self.tables.update_w_default
-
- values = [(table.c.y, table.c.x + 5), ("x", 10)]
-
- self.assert_compile(
- table.update(preserve_parameter_order=True).values(values),
- "UPDATE update_w_default SET ycol=(update_w_default.x + :x_1), "
- "x=:x, data=:data",
- )
-
- def test_update_ordered_parameters_override_onupdate(self):
- table = self.tables.update_w_default
-
- values = [
- (table.c.y, table.c.x + 5),
- (table.c.data, table.c.x + 10),
- ("x", 10),
- ]
-
- self.assert_compile(
- table.update(preserve_parameter_order=True).values(values),
- "UPDATE update_w_default SET ycol=(update_w_default.x + :x_1), "
- "data=(update_w_default.x + :x_2), x=:x",
- )
-
- def test_update_preserve_order_reqs_listtups(self):
- table1 = self.tables.mytable
- testing.assert_raises_message(
- ValueError,
- r"When preserve_parameter_order is True, values\(\) "
- r"only accepts a list of 2-tuples",
- table1.update(preserve_parameter_order=True).values,
- {"description": "foo", "name": "bar"},
- )
-
def test_update_ordereddict(self):
table1 = self.tables.mytable
@@ -790,15 +700,15 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
)
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
),
- values=values,
- ),
+ )
+ .values(values),
"UPDATE mytable "
"SET "
"myid=do_stuff(mytable.myid, :param_1), "
@@ -954,35 +864,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
),
)
- @random_update_order_parameters()
- def test_update_to_expression_ppo(
- self, randomized_param_order_update, t, idx_to_value
- ):
- dialect = default.StrCompileDialect()
- dialect.paramstyle = "qmark"
- dialect.positional = True
-
- # deprecated pattern here
- stmt = t.update(preserve_parameter_order=True).values(
- [(col[idx], val) for col, idx, val in idx_to_value]
- )
-
- self.assert_compile(
- stmt,
- "UPDATE foo SET %s"
- % (
- ", ".join(
- "%s[?]=?" % col.key for col, idx, val in idx_to_value
- )
- ),
- dialect=dialect,
- checkpositional=tuple(
- itertools.chain.from_iterable(
- (idx, val) for col, idx, val in idx_to_value
- )
- ),
- )
-
def test_update_to_expression_three(self):
# this test is from test_defaults but exercises a particular
# parameter ordering issue
@@ -1076,9 +957,9 @@ class UpdateFromCompileTest(
# against the alias, but we name the table-bound column
# in values. The behavior here isn't really defined
self.assert_compile(
- update(talias1, talias1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(talias1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1 "
"SET name=:name "
"WHERE t1.myid = :myid_1",
@@ -1093,9 +974,9 @@ class UpdateFromCompileTest(
# which is causing the "table1.c.name" param to be handled
# as an "extra table", hence we see the full table name rendered.
self.assert_compile(
- update(talias1, table1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(table1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1 "
"SET name=:mytable_name "
"FROM mytable "
@@ -1108,9 +989,9 @@ class UpdateFromCompileTest(
talias1 = table1.alias("t1")
self.assert_compile(
- update(talias1, table1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(table1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1, mytable SET mytable.name=%s "
"WHERE mytable.myid = %s",
checkparams={"mytable_name": "fred", "myid_1": 7},
@@ -1212,9 +1093,13 @@ class UpdateFromCompileTest(
checkparams = {"email_address_1": "e1", "id_1": 7, "name": "newname"}
- cols = [addresses.c.id, addresses.c.user_id, addresses.c.email_address]
-
- subq = select(cols).where(addresses.c.id == 7).alias()
+ subq = (
+ select(
+ addresses.c.id, addresses.c.user_id, addresses.c.email_address
+ )
+ .where(addresses.c.id == 7)
+ .alias()
+ )
self.assert_compile(
users.update()
.values(name="newname")