summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-08-17 17:24:27 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-09-16 12:31:05 -0400
commit7e864fc7b1b950760cbf02e6dcd5aa5aac267400 (patch)
tree2382b016c3eb82ae463719cc948f45ccd2a226a2 /test
parent8ebc8184392e20727748cd1405245e527f17e111 (diff)
downloadsqlalchemy-7e864fc7b1b950760cbf02e6dcd5aa5aac267400.tar.gz
Create a framework to allow all SQLALCHEMY_WARN_20 to pass
As the test suite has widespread use of many patterns that are deprecated, enable SQLALCHEMY_WARN_20 globally for the test suite but then break the warnings filter out into a whole list of all the individual warnings we are looking for. this way individual changesets can target a specific class of warning, as many of these warnings will indivdidually affect dozens of files and potentially hundreds of lines of code. Many warnings are also resolved here as this patch started out that way. From this point forward there should be changesets that target a subset of the warnings at a time. For expediency, updates some migration 2.0 docs for ORM as well. Change-Id: I98b8defdf7c37b818b3824d02f7668e3f5f31c94
Diffstat (limited to 'test')
-rw-r--r--test/dialect/postgresql/test_dialect.py36
-rw-r--r--test/dialect/postgresql/test_query.py40
-rw-r--r--test/dialect/postgresql/test_reflection.py2
-rw-r--r--test/dialect/postgresql/test_types.py49
-rw-r--r--test/engine/test_execute.py13
-rw-r--r--test/orm/inheritance/_poly_fixtures.py10
-rw-r--r--test/orm/inheritance/test_abc_inheritance.py505
-rw-r--r--test/orm/inheritance/test_abc_polymorphic.py2
-rw-r--r--test/orm/inheritance/test_assorted_poly.py152
-rw-r--r--test/orm/inheritance/test_basic.py170
-rw-r--r--test/orm/inheritance/test_concrete.py110
-rw-r--r--test/orm/inheritance/test_magazine.py6
-rw-r--r--test/orm/inheritance/test_manytomany.py4
-rw-r--r--test/orm/inheritance/test_poly_linked_list.py2
-rw-r--r--test/orm/inheritance/test_poly_persistence.py20
-rw-r--r--test/orm/inheritance/test_polymorphic_rel.py92
-rw-r--r--test/orm/inheritance/test_productspec.py2
-rw-r--r--test/orm/inheritance/test_relationship.py143
-rw-r--r--test/orm/inheritance/test_single.py68
-rw-r--r--test/orm/test_deprecations.py4
-rw-r--r--test/sql/test_case_statement.py41
-rw-r--r--test/sql/test_defaults.py23
-rw-r--r--test/sql/test_delete.py6
-rw-r--r--test/sql/test_deprecations.py261
-rw-r--r--test/sql/test_external_traversal.py14
-rw-r--r--test/sql/test_functions.py33
-rw-r--r--test/sql/test_insert.py14
-rw-r--r--test/sql/test_insert_exec.py192
-rw-r--r--test/sql/test_labels.py64
-rw-r--r--test/sql/test_returning.py4
-rw-r--r--test/sql/test_update.py237
31 files changed, 1322 insertions, 997 deletions
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index 13df1ce0f..971d4f12f 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -556,13 +556,17 @@ class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
t = self.tables.data
- ins = t.insert(inline=True).values(
- id=bindparam("id"),
- x=select(literal_column("5"))
- .select_from(self.tables.data)
- .scalar_subquery(),
- y=bindparam("y"),
- z=bindparam("z"),
+ ins = (
+ t.insert()
+ .inline()
+ .values(
+ id=bindparam("id"),
+ x=select(literal_column("5"))
+ .select_from(self.tables.data)
+ .scalar_subquery(),
+ y=bindparam("y"),
+ z=bindparam("z"),
+ )
)
# compiled SQL has a newline in it
eq_(
@@ -608,13 +612,17 @@ class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
t = self.tables.data
- ins = t.insert(inline=True).values(
- id=bindparam("id"),
- x=select(literal_column("5"))
- .select_from(self.tables.data)
- .scalar_subquery(),
- y=bindparam("y"),
- z=bindparam("z"),
+ ins = (
+ t.insert()
+ .inline()
+ .values(
+ id=bindparam("id"),
+ x=select(literal_column("5"))
+ .select_from(self.tables.data)
+ .scalar_subquery(),
+ y=bindparam("y"),
+ z=bindparam("z"),
+ )
)
# compiled SQL has a newline in it
eq_(
diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py
index eec6eed4a..3484fa4c3 100644
--- a/test/dialect/postgresql/test_query.py
+++ b/test/dialect/postgresql/test_query.py
@@ -194,13 +194,11 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
# single execute, explicit id, inline
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
# single execute, inline, uses SERIAL
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
@@ -262,10 +260,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
@@ -338,13 +334,11 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
# single execute, explicit id, inline
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
# single execute, inline, uses SERIAL
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
@@ -406,10 +400,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
@@ -467,10 +459,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
@@ -532,10 +522,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
@@ -634,7 +622,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
[{"id": 31, "data": "d2"}, {"id": 32, "data": "d3"}],
)
- conn.execute(table.insert(inline=True), {"id": 33, "data": "d4"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d4"})
eq_(
conn.execute(table.select()).fetchall(),
[(30, "d1"), (31, "d2"), (32, "d3"), (33, "d4")],
@@ -671,7 +659,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(),
[{"id": 31, "data": "d2"}, {"id": 32, "data": "d3"}],
)
- conn.execute(table.insert(inline=True), {"id": 33, "data": "d4"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d4"})
eq_(
conn.execute(table.select()).fetchall(),
[(30, "d1"), (31, "d2"), (32, "d3"), (33, "d4")],
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 263684e12..b8de35f42 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -1193,7 +1193,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
)
metadata.create_all()
with testing.db.connect() as conn:
- conn.execute("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
+ conn.exec_driver_sql("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
# prior to #5205, this would return:
# [{'column_names': ['x', 'name'],
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index 2d1fd4ee8..f13f25132 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -3157,36 +3157,31 @@ class JSONRoundTripTest(fixtures.TablesTest):
).fetchall()
eq_([d for d, in data], [None])
- def _test_insert(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}},
- )
- self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], conn)
+ def _test_insert(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}},
+ )
+ self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], conn)
- def _test_insert_nulls(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "data": null()}
- )
- self._assert_data([None], conn)
+ def _test_insert_nulls(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(), {"name": "r1", "data": null()}
+ )
+ self._assert_data([None], conn)
- def _test_insert_none_as_null(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "nulldata": None},
- )
- self._assert_column_is_NULL(conn, column="nulldata")
+ def _test_insert_none_as_null(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(), {"name": "r1", "nulldata": None},
+ )
+ self._assert_column_is_NULL(conn, column="nulldata")
- def _test_insert_nulljson_into_none_as_null(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "nulldata": JSON.NULL},
- )
- self._assert_column_is_JSON_NULL(conn, column="nulldata")
+ def _test_insert_nulljson_into_none_as_null(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "nulldata": JSON.NULL},
+ )
+ self._assert_column_is_JSON_NULL(conn, column="nulldata")
def test_reflect(self):
insp = inspect(testing.db)
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 3a4b58df6..7e3893afd 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -351,6 +351,7 @@ class ExecuteTest(fixtures.TablesTest):
def test_stmt_exception_bytestring_raised(self):
name = util.u("méil")
+ users = self.tables.users
with testing.db.connect() as conn:
assert_raises_message(
tsa.exc.StatementError,
@@ -3120,13 +3121,15 @@ class AutocommitKeywordFixture(object):
with engine.connect() as conn:
conn.exec_driver_sql("%s something table something" % keyword)
- for _call in dbapi.connect().mock_calls:
- _call.kwargs.clear()
-
if expected:
- eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])
+ eq_(
+ [n for (n, k, s) in dbapi.connect().mock_calls],
+ ["cursor", "commit"],
+ )
else:
- eq_(dbapi.connect().mock_calls, [call.cursor()])
+ eq_(
+ [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"]
+ )
class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
diff --git a/test/orm/inheritance/_poly_fixtures.py b/test/orm/inheritance/_poly_fixtures.py
index 77564bcdb..4253b4bee 100644
--- a/test/orm/inheritance/_poly_fixtures.py
+++ b/test/orm/inheritance/_poly_fixtures.py
@@ -2,9 +2,9 @@ from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import util
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import config
@@ -221,11 +221,9 @@ class _PolymorphicFixtureBase(fixtures.MappedTest, AssertsCompiledSQL):
cls.c2 = c2 = Company(name="Elbonia, Inc.")
c2.employees = [e3]
- sess = create_session(connection)
- sess.add(c1)
- sess.add(c2)
- sess.flush()
- sess.expunge_all()
+ with sessionmaker(connection, expire_on_commit=False).begin() as sess:
+ sess.add(c1)
+ sess.add(c2)
cls.all_employees = [e1, e2, b1, m1, e3]
cls.c1_employees = [e1, e2, b1, m1]
diff --git a/test/orm/inheritance/test_abc_inheritance.py b/test/orm/inheritance/test_abc_inheritance.py
index b5bff78d2..60c488be3 100644
--- a/test/orm/inheritance/test_abc_inheritance.py
+++ b/test/orm/inheritance/test_abc_inheritance.py
@@ -1,303 +1,270 @@
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
+from sqlalchemy import testing
from sqlalchemy.orm import class_mapper
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
from sqlalchemy.orm.interfaces import MANYTOONE
from sqlalchemy.orm.interfaces import ONETOMANY
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-def produce_test(parent, child, direction):
- """produce a testcase for A->B->C inheritance with a self-referential
- relationship between two of the classes, using either one-to-many or
- many-to-one.
+def _combinations():
+ for parent in ["a", "b", "c"]:
+ for child in ["a", "b", "c"]:
+ for direction in [ONETOMANY, MANYTOONE]:
+
+ name = "Test%sTo%s%s" % (
+ parent,
+ child,
+ (direction is ONETOMANY and "O2M" or "M2O"),
+ )
+ yield (name, parent, child, direction)
- the old "no discriminator column" pattern is used.
- """
+@testing.combinations(
+ *list(_combinations()), argnames="name,parent,child,direction", id_="saaa"
+)
+class ABCTest(fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ parent, child, direction = cls.parent, cls.child, cls.direction
- class ABCTest(fixtures.MappedTest):
- @classmethod
- def define_tables(cls, metadata):
- global ta, tb, tc
- ta = ["a", metadata]
+ ta = ["a", metadata]
+ ta.append(
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True,
+ )
+ ),
+ ta.append(Column("a_data", String(30)))
+ if "a" == parent and direction == MANYTOONE:
ta.append(
Column(
- "id",
+ "child_id",
Integer,
- primary_key=True,
- test_needs_autoincrement=True,
- )
- ),
- ta.append(Column("a_data", String(30)))
- if "a" == parent and direction == MANYTOONE:
- ta.append(
- Column(
- "child_id",
- Integer,
- ForeignKey(
- "%s.id" % child, use_alter=True, name="foo"
- ),
- )
+ ForeignKey("%s.id" % child, use_alter=True, name="foo"),
)
- elif "a" == child and direction == ONETOMANY:
- ta.append(
- Column(
- "parent_id",
- Integer,
- ForeignKey(
- "%s.id" % parent, use_alter=True, name="foo"
- ),
- )
+ )
+ elif "a" == child and direction == ONETOMANY:
+ ta.append(
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("%s.id" % parent, use_alter=True, name="foo"),
)
- ta = Table(*ta)
-
- tb = ["b", metadata]
- tb.append(
- Column("id", Integer, ForeignKey("a.id"), primary_key=True)
)
+ ta = Table(*ta)
- tb.append(Column("b_data", String(30)))
-
- if "b" == parent and direction == MANYTOONE:
- tb.append(
- Column(
- "child_id",
- Integer,
- ForeignKey(
- "%s.id" % child, use_alter=True, name="foo"
- ),
- )
- )
- elif "b" == child and direction == ONETOMANY:
- tb.append(
- Column(
- "parent_id",
- Integer,
- ForeignKey(
- "%s.id" % parent, use_alter=True, name="foo"
- ),
- )
- )
- tb = Table(*tb)
+ tb = ["b", metadata]
+ tb.append(Column("id", Integer, ForeignKey("a.id"), primary_key=True))
- tc = ["c", metadata]
- tc.append(
- Column("id", Integer, ForeignKey("b.id"), primary_key=True)
- )
+ tb.append(Column("b_data", String(30)))
- tc.append(Column("c_data", String(30)))
-
- if "c" == parent and direction == MANYTOONE:
- tc.append(
- Column(
- "child_id",
- Integer,
- ForeignKey(
- "%s.id" % child, use_alter=True, name="foo"
- ),
- )
- )
- elif "c" == child and direction == ONETOMANY:
- tc.append(
- Column(
- "parent_id",
- Integer,
- ForeignKey(
- "%s.id" % parent, use_alter=True, name="foo"
- ),
- )
+ if "b" == parent and direction == MANYTOONE:
+ tb.append(
+ Column(
+ "child_id",
+ Integer,
+ ForeignKey("%s.id" % child, use_alter=True, name="foo"),
)
- tc = Table(*tc)
-
- def teardown(self):
- if direction == MANYTOONE:
- parent_table = {"a": ta, "b": tb, "c": tc}[parent]
- parent_table.update(
- values={parent_table.c.child_id: None}
- ).execute()
- elif direction == ONETOMANY:
- child_table = {"a": ta, "b": tb, "c": tc}[child]
- child_table.update(
- values={child_table.c.parent_id: None}
- ).execute()
- super(ABCTest, self).teardown()
-
- def test_roundtrip(self):
- parent_table = {"a": ta, "b": tb, "c": tc}[parent]
- child_table = {"a": ta, "b": tb, "c": tc}[child]
-
- remote_side = None
-
- if direction == MANYTOONE:
- foreign_keys = [parent_table.c.child_id]
- elif direction == ONETOMANY:
- foreign_keys = [child_table.c.parent_id]
-
- atob = ta.c.id == tb.c.id
- btoc = tc.c.id == tb.c.id
-
- if direction == ONETOMANY:
- relationshipjoin = parent_table.c.id == child_table.c.parent_id
- elif direction == MANYTOONE:
- relationshipjoin = parent_table.c.child_id == child_table.c.id
- if parent is child:
- remote_side = [child_table.c.id]
-
- abcjoin = polymorphic_union(
- {
- "a": ta.select(
- tb.c.id == None, # noqa
- from_obj=[ta.outerjoin(tb, onclause=atob)],
- ).subquery(),
- "b": ta.join(tb, onclause=atob)
- .outerjoin(tc, onclause=btoc)
- .select(tc.c.id == None)
- .reduce_columns()
- .subquery(), # noqa
- "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
- },
- "type",
- "abcjoin",
)
-
- bcjoin = polymorphic_union(
- {
- "b": ta.join(tb, onclause=atob)
- .outerjoin(tc, onclause=btoc)
- .select(tc.c.id == None)
- .reduce_columns()
- .subquery(), # noqa
- "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
- },
- "type",
- "bcjoin",
+ elif "b" == child and direction == ONETOMANY:
+ tb.append(
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("%s.id" % parent, use_alter=True, name="foo"),
+ )
)
+ tb = Table(*tb)
- class A(object):
- def __init__(self, name):
- self.a_data = name
+ tc = ["c", metadata]
+ tc.append(Column("id", Integer, ForeignKey("b.id"), primary_key=True))
- class B(A):
- pass
+ tc.append(Column("c_data", String(30)))
- class C(B):
- pass
-
- mapper(
- A,
- ta,
- polymorphic_on=abcjoin.c.type,
- with_polymorphic=("*", abcjoin),
- polymorphic_identity="a",
- )
- mapper(
- B,
- tb,
- polymorphic_on=bcjoin.c.type,
- with_polymorphic=("*", bcjoin),
- polymorphic_identity="b",
- inherits=A,
- inherit_condition=atob,
- )
- mapper(
- C,
- tc,
- polymorphic_identity="c",
- with_polymorphic=("*", tc.join(tb, btoc).join(ta, atob)),
- inherits=B,
- inherit_condition=btoc,
+ if "c" == parent and direction == MANYTOONE:
+ tc.append(
+ Column(
+ "child_id",
+ Integer,
+ ForeignKey("%s.id" % child, use_alter=True, name="foo"),
+ )
)
-
- parent_mapper = class_mapper({ta: A, tb: B, tc: C}[parent_table])
- child_mapper = class_mapper({ta: A, tb: B, tc: C}[child_table])
-
- parent_class = parent_mapper.class_
- child_class = child_mapper.class_
-
- parent_mapper.add_property(
- "collection",
- relationship(
- child_mapper,
- primaryjoin=relationshipjoin,
- foreign_keys=foreign_keys,
- order_by=child_mapper.c.id,
- remote_side=remote_side,
- uselist=True,
- ),
+ elif "c" == child and direction == ONETOMANY:
+ tc.append(
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("%s.id" % parent, use_alter=True, name="foo"),
+ )
)
-
- sess = create_session()
-
- parent_obj = parent_class("parent1")
- child_obj = child_class("child1")
- somea = A("somea")
- someb = B("someb")
- somec = C("somec")
-
- # print "APPENDING", parent.__class__.__name__ , "TO",
- # child.__class__.__name__
-
- sess.add(parent_obj)
- parent_obj.collection.append(child_obj)
- if direction == ONETOMANY:
- child2 = child_class("child2")
- parent_obj.collection.append(child2)
- sess.add(child2)
- elif direction == MANYTOONE:
- parent2 = parent_class("parent2")
- parent2.collection.append(child_obj)
- sess.add(parent2)
- sess.add(somea)
- sess.add(someb)
- sess.add(somec)
- sess.flush()
- sess.expunge_all()
-
- # assert result via direct get() of parent object
- result = sess.query(parent_class).get(parent_obj.id)
- assert result.id == parent_obj.id
- assert result.collection[0].id == child_obj.id
- if direction == ONETOMANY:
- assert result.collection[1].id == child2.id
- elif direction == MANYTOONE:
- result2 = sess.query(parent_class).get(parent2.id)
- assert result2.id == parent2.id
- assert result2.collection[0].id == child_obj.id
-
- sess.expunge_all()
-
- # assert result via polymorphic load of parent object
- result = sess.query(A).filter_by(id=parent_obj.id).one()
- assert result.id == parent_obj.id
- assert result.collection[0].id == child_obj.id
- if direction == ONETOMANY:
- assert result.collection[1].id == child2.id
- elif direction == MANYTOONE:
- result2 = sess.query(A).filter_by(id=parent2.id).one()
- assert result2.id == parent2.id
- assert result2.collection[0].id == child_obj.id
-
- ABCTest.__name__ = "Test%sTo%s%s" % (
- parent,
- child,
- (direction is ONETOMANY and "O2M" or "M2O"),
- )
- return ABCTest
-
-
-# test all combinations of polymorphic a/b/c related to another of a/b/c
-for parent in ["a", "b", "c"]:
- for child in ["a", "b", "c"]:
- for direction in [ONETOMANY, MANYTOONE]:
- testclass = produce_test(parent, child, direction)
- exec("%s = testclass" % testclass.__name__)
- del testclass
-
-del produce_test
+ tc = Table(*tc)
+
+ @classmethod
+ def setup_mappers(cls):
+ parent, child, direction = cls.parent, cls.child, cls.direction
+ ta, tb, tc = cls.tables("a", "b", "c")
+ parent_table = {"a": ta, "b": tb, "c": tc}[parent]
+ child_table = {"a": ta, "b": tb, "c": tc}[child]
+
+ remote_side = None
+
+ if direction == MANYTOONE:
+ foreign_keys = [parent_table.c.child_id]
+ elif direction == ONETOMANY:
+ foreign_keys = [child_table.c.parent_id]
+
+ atob = ta.c.id == tb.c.id
+ btoc = tc.c.id == tb.c.id
+
+ if direction == ONETOMANY:
+ relationshipjoin = parent_table.c.id == child_table.c.parent_id
+ elif direction == MANYTOONE:
+ relationshipjoin = parent_table.c.child_id == child_table.c.id
+ if parent is child:
+ remote_side = [child_table.c.id]
+
+ abcjoin = polymorphic_union(
+ {
+ "a": ta.select()
+ .where(tb.c.id == None) # noqa
+ .select_from(ta.outerjoin(tb, onclause=atob))
+ .subquery(),
+ "b": ta.join(tb, onclause=atob)
+ .outerjoin(tc, onclause=btoc)
+ .select()
+ .where(tc.c.id == None)
+ .reduce_columns()
+ .subquery(), # noqa
+ "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
+ },
+ "type",
+ "abcjoin",
+ )
+
+ bcjoin = polymorphic_union(
+ {
+ "b": ta.join(tb, onclause=atob)
+ .outerjoin(tc, onclause=btoc)
+ .select()
+ .where(tc.c.id == None)
+ .reduce_columns()
+ .subquery(), # noqa
+ "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
+ },
+ "type",
+ "bcjoin",
+ )
+
+ class A(cls.Comparable):
+ def __init__(self, name):
+ self.a_data = name
+
+ class B(A):
+ pass
+
+ class C(B):
+ pass
+
+ mapper(
+ A,
+ ta,
+ polymorphic_on=abcjoin.c.type,
+ with_polymorphic=("*", abcjoin),
+ polymorphic_identity="a",
+ )
+ mapper(
+ B,
+ tb,
+ polymorphic_on=bcjoin.c.type,
+ with_polymorphic=("*", bcjoin),
+ polymorphic_identity="b",
+ inherits=A,
+ inherit_condition=atob,
+ )
+ mapper(
+ C,
+ tc,
+ polymorphic_identity="c",
+ with_polymorphic=("*", tc.join(tb, btoc).join(ta, atob)),
+ inherits=B,
+ inherit_condition=btoc,
+ )
+
+ parent_mapper = class_mapper({ta: A, tb: B, tc: C}[parent_table])
+ child_mapper = class_mapper({ta: A, tb: B, tc: C}[child_table])
+
+ parent_mapper.add_property(
+ "collection",
+ relationship(
+ child_mapper,
+ primaryjoin=relationshipjoin,
+ foreign_keys=foreign_keys,
+ order_by=child_mapper.c.id,
+ remote_side=remote_side,
+ uselist=True,
+ ),
+ )
+
+ def test_roundtrip(self):
+ parent, child, direction = self.parent, self.child, self.direction
+ A, B, C = self.classes("A", "B", "C")
+ parent_class = {"a": A, "b": B, "c": C}[parent]
+ child_class = {"a": A, "b": B, "c": C}[child]
+
+ sess = create_session()
+
+ parent_obj = parent_class("parent1")
+ child_obj = child_class("child1")
+ somea = A("somea")
+ someb = B("someb")
+ somec = C("somec")
+
+ # print "APPENDING", parent.__class__.__name__ , "TO",
+ # child.__class__.__name__
+
+ sess.add(parent_obj)
+ parent_obj.collection.append(child_obj)
+ if direction == ONETOMANY:
+ child2 = child_class("child2")
+ parent_obj.collection.append(child2)
+ sess.add(child2)
+ elif direction == MANYTOONE:
+ parent2 = parent_class("parent2")
+ parent2.collection.append(child_obj)
+ sess.add(parent2)
+ sess.add(somea)
+ sess.add(someb)
+ sess.add(somec)
+ sess.commit()
+ sess.close()
+
+ # assert result via direct get() of parent object
+ result = sess.get(parent_class, parent_obj.id)
+ assert result.id == parent_obj.id
+ assert result.collection[0].id == child_obj.id
+ if direction == ONETOMANY:
+ assert result.collection[1].id == child2.id
+ elif direction == MANYTOONE:
+ result2 = sess.get(parent_class, parent2.id)
+ assert result2.id == parent2.id
+ assert result2.collection[0].id == child_obj.id
+
+ sess.expunge_all()
+
+ # assert result via polymorphic load of parent object
+ result = sess.query(A).filter_by(id=parent_obj.id).one()
+ assert result.id == parent_obj.id
+ assert result.collection[0].id == child_obj.id
+ if direction == ONETOMANY:
+ assert result.collection[1].id == child2.id
+ elif direction == MANYTOONE:
+ result2 = sess.query(A).filter_by(id=parent2.id).one()
+ assert result2.id == parent2.id
+ assert result2.collection[0].id == child_obj.id
diff --git a/test/orm/inheritance/test_abc_polymorphic.py b/test/orm/inheritance/test_abc_polymorphic.py
index cf06c9e26..0d28ef342 100644
--- a/test/orm/inheritance/test_abc_polymorphic.py
+++ b/test/orm/inheritance/test_abc_polymorphic.py
@@ -2,10 +2,10 @@ from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import testing
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
diff --git a/test/orm/inheritance/test_assorted_poly.py b/test/orm/inheritance/test_assorted_poly.py
index fe6880591..2767607cb 100644
--- a/test/orm/inheritance/test_assorted_poly.py
+++ b/test/orm/inheritance/test_assorted_poly.py
@@ -16,18 +16,19 @@ from sqlalchemy import util
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import column_property
from sqlalchemy.orm import contains_eager
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import join
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
+from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.orm.interfaces import MANYTOONE
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -115,8 +116,8 @@ class RelationshipTest1(fixtures.MappedTest):
session.flush()
session.expunge_all()
- p = session.query(Person).get(p.person_id)
- m = session.query(Manager).get(m.person_id)
+ p = session.get(Person, p.person_id)
+ m = session.get(Manager, m.person_id)
assert p.manager is m
def test_descendant_refs_parent(self):
@@ -147,8 +148,8 @@ class RelationshipTest1(fixtures.MappedTest):
session.flush()
session.expunge_all()
- p = session.query(Person).get(p.person_id)
- m = session.query(Manager).get(m.person_id)
+ p = session.get(Person, p.person_id)
+ m = session.get(Manager, m.person_id)
assert m.employee is p
@@ -215,9 +216,9 @@ class RelationshipTest2(fixtures.MappedTest):
if jointype == "join1":
poly_union = polymorphic_union(
{
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
"manager": join(
people,
managers,
@@ -230,9 +231,9 @@ class RelationshipTest2(fixtures.MappedTest):
elif jointype == "join2":
poly_union = polymorphic_union(
{
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
"manager": managers.join(
people, people.c.person_id == managers.c.person_id
),
@@ -306,8 +307,8 @@ class RelationshipTest2(fixtures.MappedTest):
sess.flush()
sess.expunge_all()
- p = sess.query(Person).get(p.person_id)
- m = sess.query(Manager).get(m.person_id)
+ p = sess.get(Person, p.person_id)
+ m = sess.get(Manager, m.person_id)
assert m.colleague is p
if usedata:
assert m.data.data == "ms data"
@@ -377,9 +378,9 @@ class RelationshipTest3(fixtures.MappedTest):
"manager": managers.join(
people, people.c.person_id == managers.c.person_id
),
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
)
@@ -391,9 +392,9 @@ class RelationshipTest3(fixtures.MappedTest):
managers,
people.c.person_id == managers.c.person_id,
),
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
)
@@ -477,10 +478,10 @@ class RelationshipTest3(fixtures.MappedTest):
sess.flush()
sess.expunge_all()
- p = sess.query(Person).get(p.person_id)
- p2 = sess.query(Person).get(p2.person_id)
- p3 = sess.query(Person).get(p3.person_id)
- m = sess.query(Person).get(m.person_id)
+ p = sess.get(Person, p.person_id)
+ p2 = sess.get(Person, p2.person_id)
+ p3 = sess.get(Person, p3.person_id)
+ m = sess.get(Person, m.person_id)
assert len(p.colleagues) == 1
assert p.colleagues == [p2]
assert m.colleagues == [p3]
@@ -635,17 +636,15 @@ class RelationshipTest4(fixtures.MappedTest):
session.expunge_all()
def go():
- testcar = (
- session.query(Car)
- .options(joinedload("employee"))
- .get(car1.car_id)
+ testcar = session.get(
+ Car, car1.car_id, options=[joinedload("employee")]
)
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testing.db, go, 1)
- car1 = session.query(Car).get(car1.car_id)
- usingGet = session.query(person_mapper).get(car1.owner)
+ car1 = session.get(Car, car1.car_id)
+ usingGet = session.get(person_mapper, car1.owner)
usingProperty = car1.employee
assert str(engineer4) == "Engineer E4, status X"
@@ -656,10 +655,8 @@ class RelationshipTest4(fixtures.MappedTest):
# and now for the lightning round, eager !
def go():
- testcar = (
- session.query(Car)
- .options(joinedload("employee"))
- .get(car1.car_id)
+ testcar = session.get(
+ Car, car1.car_id, options=[joinedload("employee")],
)
assert str(testcar.employee) == "Engineer E4, status X"
@@ -864,8 +861,8 @@ class RelationshipTest6(fixtures.MappedTest):
sess.flush()
sess.expunge_all()
- m = sess.query(Manager).get(m.person_id)
- m2 = sess.query(Manager).get(m2.person_id)
+ m = sess.get(Manager, m.person_id)
+ m2 = sess.get(Manager, m2.person_id)
assert m.colleague is m2
@@ -984,7 +981,8 @@ class RelationshipTest7(fixtures.MappedTest):
car_join = polymorphic_union(
{
"car": cars.outerjoin(offroad_cars)
- .select(offroad_cars.c.car_id == None)
+ .select()
+ .where(offroad_cars.c.car_id == None)
.reduce_columns()
.subquery(),
"offroad": cars.join(offroad_cars),
@@ -1266,42 +1264,38 @@ class GenerativeTest(fixtures.MappedTest, AssertsExecutionResults):
Status, Person, Engineer, Manager, Car = cls.classes(
"Status", "Person", "Engineer", "Manager", "Car"
)
- session = create_session(connection)
+ with sessionmaker(connection).begin() as session:
- active = Status(name="active")
- dead = Status(name="dead")
+ active = Status(name="active")
+ dead = Status(name="dead")
- session.add(active)
- session.add(dead)
- session.flush()
-
- # TODO: we haven't created assertions for all
- # the data combinations created here
+ session.add(active)
+ session.add(dead)
- # creating 5 managers named from M1 to M5
- # and 5 engineers named from E1 to E5
- # M4, M5, E4 and E5 are dead
- for i in range(1, 5):
- if i < 4:
- st = active
- else:
- st = dead
- session.add(
- Manager(name="M%d" % i, category="YYYYYYYYY", status=st)
- )
- session.add(Engineer(name="E%d" % i, field="X", status=st))
+ # TODO: we haven't created assertions for all
+ # the data combinations created here
- session.flush()
+ # creating 5 managers named from M1 to M5
+ # and 5 engineers named from E1 to E5
+ # M4, M5, E4 and E5 are dead
+ for i in range(1, 5):
+ if i < 4:
+ st = active
+ else:
+ st = dead
+ session.add(
+ Manager(name="M%d" % i, category="YYYYYYYYY", status=st)
+ )
+ session.add(Engineer(name="E%d" % i, field="X", status=st))
- # get E4
- engineer4 = session.query(Engineer).filter_by(name="E4").one()
+ # get E4
+ engineer4 = session.query(Engineer).filter_by(name="E4").one()
- # create 2 cars for E4, one active and one dead
- car1 = Car(employee=engineer4, status=active)
- car2 = Car(employee=engineer4, status=dead)
- session.add(car1)
- session.add(car2)
- session.flush()
+ # create 2 cars for E4, one active and one dead
+ car1 = Car(employee=engineer4, status=active)
+ car2 = Car(employee=engineer4, status=dead)
+ session.add(car1)
+ session.add(car2)
def test_join_to_q_person(self):
Status, Person, Engineer, Manager, Car = self.classes(
@@ -1357,7 +1351,7 @@ class GenerativeTest(fixtures.MappedTest, AssertsExecutionResults):
)
session = create_session()
r = session.query(Person).filter(
- exists([1], Car.owner == Person.person_id)
+ exists().where(Car.owner == Person.person_id)
)
eq_(
@@ -1424,9 +1418,9 @@ class MultiLevelTest(fixtures.MappedTest):
.where(table_Employee.c.atype == "Engineer")
.select_from(table_Employee.join(table_Engineer))
.subquery(),
- "Employee": table_Employee.select(
- table_Employee.c.atype == "Employee"
- ).subquery(),
+ "Employee": table_Employee.select()
+ .where(table_Employee.c.atype == "Employee")
+ .subquery(),
},
None,
"pu_employee",
@@ -1540,9 +1534,9 @@ class ManyToManyPolyTest(fixtures.MappedTest):
item_join = polymorphic_union(
{
- "BaseItem": base_item_table.select(
- base_item_table.c.child_name == "BaseItem"
- ).subquery(),
+ "BaseItem": base_item_table.select()
+ .where(base_item_table.c.child_name == "BaseItem")
+ .subquery(),
"Item": base_item_table.join(item_table),
},
None,
@@ -1610,7 +1604,7 @@ class CustomPKTest(fixtures.MappedTest):
# a 2-col pk in any case but the leading select has a NULL for the
# "t2id" column
d = util.OrderedDict()
- d["t1"] = t1.select(t1.c.type == "t1").subquery()
+ d["t1"] = t1.select().where(t1.c.type == "t1").subquery()
d["t2"] = t1.join(t2)
pjoin = polymorphic_union(d, None, "pjoin")
@@ -1634,9 +1628,9 @@ class CustomPKTest(fixtures.MappedTest):
# query using get(), using only one value.
# this requires the select_table mapper
# has the same single-col primary key.
- assert sess.query(T1).get(ot1.id).id == ot1.id
+ assert sess.get(T1, ot1.id).id == ot1.id
- ot1 = sess.query(T1).get(ot1.id)
+ ot1 = sess.get(T1, ot1.id)
ot1.data = "hi"
sess.flush()
@@ -1656,7 +1650,7 @@ class CustomPKTest(fixtures.MappedTest):
# a 2-col pk in any case but the leading select has a NULL for the
# "t2id" column
d = util.OrderedDict()
- d["t1"] = t1.select(t1.c.type == "t1").subquery()
+ d["t1"] = t1.select().where(t1.c.type == "t1").subquery()
d["t2"] = t1.join(t2)
pjoin = polymorphic_union(d, None, "pjoin")
@@ -1681,9 +1675,9 @@ class CustomPKTest(fixtures.MappedTest):
# query using get(), using only one value. this requires the
# select_table mapper
# has the same single-col primary key.
- assert sess.query(T1).get(ot1.id).id == ot1.id
+ assert sess.get(T1, ot1.id).id == ot1.id
- ot1 = sess.query(T1).get(ot1.id)
+ ot1 = sess.get(T1, ot1.id)
ot1.data = "hi"
sess.flush()
diff --git a/test/orm/inheritance/test_basic.py b/test/orm/inheritance/test_basic.py
index 6372812d2..9db11d362 100644
--- a/test/orm/inheritance/test_basic.py
+++ b/test/orm/inheritance/test_basic.py
@@ -19,7 +19,6 @@ from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import column_property
from sqlalchemy.orm import composite
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import deferred
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import joinedload
@@ -43,6 +42,7 @@ from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.assertsql import Conditional
from sqlalchemy.testing.assertsql import Or
from sqlalchemy.testing.assertsql import RegexSQL
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -194,7 +194,7 @@ class PolyExpressionEagerLoad(fixtures.DeclarativeMappedTest):
child_id = Column(Integer, ForeignKey("a.id"))
child = relationship("A")
- p_a = case([(discriminator == "a", "a")], else_="b")
+ p_a = case((discriminator == "a", "a"), else_="b")
__mapper_args__ = {
"polymorphic_identity": "a",
@@ -455,7 +455,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
def test_polymorphic_on_expr_explicit_map(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
mapper(
Parent,
t1,
@@ -470,7 +470,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
def test_polymorphic_on_expr_implicit_map_no_label_joined(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, t2, inherits=Parent, polymorphic_identity="child")
@@ -479,9 +479,9 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
def test_polymorphic_on_expr_implicit_map_w_label_joined(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case(
- [(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]
- ).label(None)
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")).label(
+ None
+ )
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, t2, inherits=Parent, polymorphic_identity="child")
@@ -492,7 +492,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
with a standalone expr"""
t1 = self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, inherits=Parent, polymorphic_identity="child")
@@ -503,9 +503,9 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
with a standalone expr"""
t1 = self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case(
- [(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]
- ).label(None)
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")).label(
+ None
+ )
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, inherits=Parent, polymorphic_identity="child")
@@ -514,7 +514,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
def test_polymorphic_on_column_prop(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
cprop = column_property(expr)
mapper(
Parent,
@@ -530,7 +530,7 @@ class PolymorphicOnNotLocalTest(fixtures.MappedTest):
def test_polymorphic_on_column_str_prop(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
cprop = column_property(expr)
mapper(
Parent,
@@ -1158,13 +1158,10 @@ class GetTest(fixtures.MappedTest):
class Blub(Bar):
pass
- def test_get_polymorphic(self):
- self._do_get_test(True)
-
- def test_get_nonpolymorphic(self):
- self._do_get_test(False)
-
- def _do_get_test(self, polymorphic):
+ @testing.combinations(
+ ("polymorphic", True), ("test_get_nonpolymorphic", False), id_="ia"
+ )
+ def test_get(self, polymorphic):
foo, Bar, Blub, blub, bar, Foo = (
self.tables.foo,
self.classes.Bar,
@@ -1197,18 +1194,18 @@ class GetTest(fixtures.MappedTest):
if polymorphic:
def go():
- assert sess.query(Foo).get(f.id) is f
- assert sess.query(Foo).get(b.id) is b
- assert sess.query(Foo).get(bl.id) is bl
- assert sess.query(Bar).get(b.id) is b
- assert sess.query(Bar).get(bl.id) is bl
- assert sess.query(Blub).get(bl.id) is bl
+ assert sess.get(Foo, f.id) is f
+ assert sess.get(Foo, b.id) is b
+ assert sess.get(Foo, bl.id) is bl
+ assert sess.get(Bar, b.id) is b
+ assert sess.get(Bar, bl.id) is bl
+ assert sess.get(Blub, bl.id) is bl
# test class mismatches - item is present
# in the identity map but we requested a subclass
- assert sess.query(Blub).get(f.id) is None
- assert sess.query(Blub).get(b.id) is None
- assert sess.query(Bar).get(f.id) is None
+ assert sess.get(Blub, f.id) is None
+ assert sess.get(Blub, b.id) is None
+ assert sess.get(Bar, f.id) is None
self.assert_sql_count(testing.db, go, 0)
else:
@@ -1217,20 +1214,20 @@ class GetTest(fixtures.MappedTest):
# polymorphic. the important part being that get() always
# returns an instance of the query's type.
def go():
- assert sess.query(Foo).get(f.id) is f
+ assert sess.get(Foo, f.id) is f
- bb = sess.query(Foo).get(b.id)
+ bb = sess.get(Foo, b.id)
assert isinstance(b, Foo) and bb.id == b.id
- bll = sess.query(Foo).get(bl.id)
+ bll = sess.get(Foo, bl.id)
assert isinstance(bll, Foo) and bll.id == bl.id
- assert sess.query(Bar).get(b.id) is b
+ assert sess.get(Bar, b.id) is b
- bll = sess.query(Bar).get(bl.id)
+ bll = sess.get(Bar, bl.id)
assert isinstance(bll, Bar) and bll.id == bl.id
- assert sess.query(Blub).get(bl.id) is bl
+ assert sess.get(Blub, bl.id) is bl
self.assert_sql_count(testing.db, go, 3)
@@ -1241,8 +1238,7 @@ class EagerLazyTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
- global foo, bar, bar_foo
- foo = Table(
+ Table(
"foo",
metadata,
Column(
@@ -1250,22 +1246,25 @@ class EagerLazyTest(fixtures.MappedTest):
),
Column("data", String(30)),
)
- bar = Table(
+ Table(
"bar",
metadata,
Column("id", Integer, ForeignKey("foo.id"), primary_key=True),
Column("bar_data", String(30)),
)
- bar_foo = Table(
+ Table(
"bar_foo",
metadata,
Column("bar_id", Integer, ForeignKey("bar.id")),
Column("foo_id", Integer, ForeignKey("foo.id")),
)
- def test_basic(self):
- class Foo(object):
+ @classmethod
+ def setup_mappers(cls):
+ foo, bar, bar_foo = cls.tables("foo", "bar", "bar_foo")
+
+ class Foo(cls.Comparable):
pass
class Bar(Foo):
@@ -1278,17 +1277,24 @@ class EagerLazyTest(fixtures.MappedTest):
"eager", relationship(foos, bar_foo, lazy="joined", viewonly=True)
)
- foo.insert().execute(data="foo1")
- bar.insert().execute(id=1, data="bar1")
+ @classmethod
+ def insert_data(cls, connection):
+ foo, bar, bar_foo = cls.tables("foo", "bar", "bar_foo")
+
+ connection.execute(foo.insert(), dict(data="foo1"))
+ connection.execute(bar.insert(), dict(id=1, data="bar1"))
+
+ connection.execute(foo.insert(), dict(data="foo2"))
+ connection.execute(bar.insert(), dict(id=2, data="bar2"))
- foo.insert().execute(data="foo2")
- bar.insert().execute(id=2, data="bar2")
+ connection.execute(foo.insert(), dict(data="foo3")) # 3
+ connection.execute(foo.insert(), dict(data="foo4")) # 4
- foo.insert().execute(data="foo3") # 3
- foo.insert().execute(data="foo4") # 4
+ connection.execute(bar_foo.insert(), dict(bar_id=1, foo_id=3))
+ connection.execute(bar_foo.insert(), dict(bar_id=2, foo_id=4))
- bar_foo.insert().execute(bar_id=1, foo_id=3)
- bar_foo.insert().execute(bar_id=2, foo_id=4)
+ def test_basic(self):
+ Bar = self.classes.Bar
sess = create_session()
q = sess.query(Bar)
@@ -1464,7 +1470,7 @@ class FlushTest(fixtures.MappedTest):
sess.add(a)
sess.flush()
- eq_(select(func.count("*")).select_from(user_roles).scalar(), 1)
+ eq_(sess.scalar(select(func.count("*")).select_from(user_roles)), 1)
def test_two(self):
admins, users, roles, user_roles = (
@@ -1514,7 +1520,7 @@ class FlushTest(fixtures.MappedTest):
a.password = "sadmin"
sess.flush()
- eq_(select(func.count("*")).select_from(user_roles).scalar(), 1)
+ eq_(sess.scalar(select(func.count("*")).select_from(user_roles)), 1)
class PassiveDeletesTest(fixtures.MappedTest):
@@ -2095,16 +2101,15 @@ class DistinctPKTest(fixtures.MappedTest):
def _do_test(self, composite):
session = create_session()
- query = session.query(Employee)
if composite:
- alice1 = query.get([1, 2])
- bob = query.get([2, 3])
- alice2 = query.get([1, 2])
+ alice1 = session.get(Employee, [1, 2])
+ bob = session.get(Employee, [2, 3])
+ alice2 = session.get(Employee, [1, 2])
else:
- alice1 = query.get(1)
- bob = query.get(2)
- alice2 = query.get(1)
+ alice1 = session.get(Employee, 1)
+ bob = session.get(Employee, 2)
+ alice2 = session.get(Employee, 1)
assert alice1.name == alice2.name == "alice"
assert bob.name == "bob"
@@ -2142,22 +2147,23 @@ class SyncCompileTest(fixtures.MappedTest):
Column("data3", String(128)),
)
- def test_joins(self):
- for j1 in (
- None,
- _b_table.c.a_id == _a_table.c.id,
- _a_table.c.id == _b_table.c.a_id,
- ):
- for j2 in (
- None,
- _b_table.c.a_id == _c_table.c.b_a_id,
- _c_table.c.b_a_id == _b_table.c.a_id,
- ):
- self._do_test(j1, j2)
- for t in reversed(_a_table.metadata.sorted_tables):
- t.delete().execute().close()
-
- def _do_test(self, j1, j2):
+ @testing.combinations(
+ lambda _a_table, _b_table: None,
+ lambda _a_table, _b_table: _b_table.c.a_id == _a_table.c.id,
+ lambda _a_table, _b_table: _a_table.c.id == _b_table.c.a_id,
+ argnames="j1",
+ )
+ @testing.combinations(
+ lambda _b_table, _c_table: None,
+ lambda _b_table, _c_table: _b_table.c.a_id == _c_table.c.b_a_id,
+ lambda _b_table, _c_table: _c_table.c.b_a_id == _b_table.c.a_id,
+ argnames="j2",
+ )
+ def test_joins(self, j1, j2):
+ _a_table, _b_table, _c_table = self.tables("a", "b", "c")
+ j1 = testing.resolve_lambda(j1, **locals())
+ j2 = testing.resolve_lambda(j2, **locals())
+
class A(object):
def __init__(self, **kwargs):
for key, value in list(kwargs.items()):
@@ -2282,7 +2288,7 @@ class OverrideColKeyTest(fixtures.MappedTest):
sess = create_session()
sess.add(s1)
sess.flush()
- assert sess.query(Sub).get(10) is s1
+ assert sess.get(Sub, 10) is s1
def test_override_onlyinparent(self):
class Base(object):
@@ -2312,7 +2318,7 @@ class OverrideColKeyTest(fixtures.MappedTest):
sess.flush()
# s1 gets '10'
- assert sess.query(Sub).get(10) is s1
+ assert sess.get(Sub, 10) is s1
# s2 gets a new id, base_id is overwritten by the ultimate
# PK col
@@ -2432,8 +2438,8 @@ class OverrideColKeyTest(fixtures.MappedTest):
sess.flush()
sess.expunge_all()
- assert sess.query(Base).get(b1.base_id).subdata == "this is base"
- assert sess.query(Sub).get(s1.base_id).subdata == "this is sub"
+ assert sess.get(Base, b1.base_id).subdata == "this is base"
+ assert sess.get(Sub, s1.base_id).subdata == "this is sub"
def test_base_descriptors_over_base_cols(self):
class Base(object):
@@ -2457,8 +2463,8 @@ class OverrideColKeyTest(fixtures.MappedTest):
sess.flush()
sess.expunge_all()
- assert sess.query(Base).get(b1.base_id).data == "this is base"
- assert sess.query(Sub).get(s1.base_id).data == "this is base"
+ assert sess.get(Base, b1.base_id).data == "this is base"
+ assert sess.get(Sub, s1.base_id).data == "this is base"
class OptimizedLoadTest(fixtures.MappedTest):
@@ -2587,7 +2593,7 @@ class OptimizedLoadTest(fixtures.MappedTest):
polymorphic_identity="sub",
with_polymorphic=(
"*",
- base.outerjoin(sub).select(use_labels=True).alias("foo"),
+ base.outerjoin(sub).select().apply_labels().alias("foo"),
),
)
sess = Session()
@@ -3602,4 +3608,4 @@ class NameConflictTest(fixtures.MappedTest):
sess.flush()
f_id = f.id
sess.expunge_all()
- assert sess.query(Content).get(f_id).content_type == "bar"
+ assert sess.get(Content, f_id).content_type == "bar"
diff --git a/test/orm/inheritance/test_concrete.py b/test/orm/inheritance/test_concrete.py
index 376337b25..9b0a050d7 100644
--- a/test/orm/inheritance/test_concrete.py
+++ b/test/orm/inheritance/test_concrete.py
@@ -7,7 +7,6 @@ from sqlalchemy.orm import attributes
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import configure_mappers
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
@@ -18,6 +17,7 @@ from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -359,7 +359,7 @@ class ConcreteTest(fixtures.MappedTest):
eq_(test_calls.mock_calls, [mock.call.engineer_info_instance()])
session.add(tom)
- session.flush()
+ session.commit()
session.close()
@@ -443,9 +443,9 @@ class ConcreteTest(fixtures.MappedTest):
assert (
len(
- testing.db.execute(
- session.query(Employee).with_labels().statement
- ).fetchall()
+ session.connection()
+ .execute(session.query(Employee).with_labels().statement)
+ .fetchall()
)
== 3
)
@@ -518,17 +518,19 @@ class ConcreteTest(fixtures.MappedTest):
session.flush()
eq_(
len(
- testing.db.execute(
+ session.connection()
+ .execute(
session.query(Employee)
.with_polymorphic("*", pjoin, pjoin.c.type)
.with_labels()
.statement
- ).fetchall()
+ )
+ .fetchall()
),
4,
)
- eq_(session.query(Employee).get(jdoe.employee_id), jdoe)
- eq_(session.query(Engineer).get(jerry.employee_id), jerry)
+ eq_(session.get(Employee, jdoe.employee_id), jdoe)
+ eq_(session.get(Engineer, jerry.employee_id), jerry)
eq_(
set(
[
@@ -575,33 +577,41 @@ class ConcreteTest(fixtures.MappedTest):
# test adaption of the column by wrapping the query in a
# subquery
- eq_(
- len(
- testing.db.execute(
- session.query(Engineer)
- .with_polymorphic("*", pjoin2, pjoin2.c.type)
- .from_self()
- .statement
- ).fetchall()
- ),
- 2,
- )
- eq_(
- set(
- [
- repr(x)
- for x in session.query(Engineer)
- .with_polymorphic("*", pjoin2, pjoin2.c.type)
- .from_self()
- ]
- ),
- set(
- [
- "Engineer Jerry knows how to program",
- "Hacker Kurt 'Badass' knows how to hack",
- ]
- ),
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ len(
+ session.connection()
+ .execute(
+ session.query(Engineer)
+ .with_polymorphic("*", pjoin2, pjoin2.c.type)
+ .from_self()
+ .statement
+ )
+ .fetchall()
+ ),
+ 2,
+ )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ set(
+ [
+ repr(x)
+ for x in session.query(Engineer)
+ .with_polymorphic("*", pjoin2, pjoin2.c.type)
+ .from_self()
+ ]
+ ),
+ set(
+ [
+ "Engineer Jerry knows how to program",
+ "Hacker Kurt 'Badass' knows how to hack",
+ ]
+ ),
+ )
def test_relationship(self):
pjoin = polymorphic_union(
@@ -638,7 +648,7 @@ class ConcreteTest(fixtures.MappedTest):
session.expunge_all()
def go():
- c2 = session.query(Company).get(c.id)
+ c2 = session.get(Company, c.id)
assert set([repr(x) for x in c2.employees]) == set(
[
"Engineer Kurt knows how to hack",
@@ -650,10 +660,8 @@ class ConcreteTest(fixtures.MappedTest):
session.expunge_all()
def go():
- c2 = (
- session.query(Company)
- .options(joinedload(Company.employees))
- .get(c.id)
+ c2 = session.get(
+ Company, c.id, options=[joinedload(Company.employees)]
)
assert set([repr(x) for x in c2.employees]) == set(
[
@@ -1174,13 +1182,17 @@ class ColKeysTest(fixtures.MappedTest):
def insert_data(cls, connection):
connection.execute(
refugees_table.insert(),
- dict(refugee_fid=1, name="refugee1"),
- dict(refugee_fid=2, name="refugee2"),
+ [
+ dict(refugee_fid=1, name="refugee1"),
+ dict(refugee_fid=2, name="refugee2"),
+ ],
)
connection.execute(
offices_table.insert(),
- dict(office_fid=1, name="office1"),
- dict(office_fid=2, name="office2"),
+ [
+ dict(office_fid=1, name="office1"),
+ dict(office_fid=2, name="office2"),
+ ],
)
def test_keys(self):
@@ -1220,7 +1232,7 @@ class ColKeysTest(fixtures.MappedTest):
polymorphic_identity="refugee",
)
sess = create_session()
- eq_(sess.query(Refugee).get(1).name, "refugee1")
- eq_(sess.query(Refugee).get(2).name, "refugee2")
- eq_(sess.query(Office).get(1).name, "office1")
- eq_(sess.query(Office).get(2).name, "office2")
+ eq_(sess.get(Refugee, 1).name, "refugee1")
+ eq_(sess.get(Refugee, 2).name, "refugee2")
+ eq_(sess.get(Office, 1).name, "office1")
+ eq_(sess.get(Office, 2).name, "office2")
diff --git a/test/orm/inheritance/test_magazine.py b/test/orm/inheritance/test_magazine.py
index 228cb1273..334ed22f3 100644
--- a/test/orm/inheritance/test_magazine.py
+++ b/test/orm/inheritance/test_magazine.py
@@ -247,9 +247,9 @@ class MagazineTest(fixtures.MappedTest):
"c": self.tables.page.join(self.tables.magazine_page).join(
self.tables.classified_page
),
- "p": self.tables.page.select(
- self.tables.page.c.type == "p"
- ).subquery(),
+ "p": self.tables.page.select()
+ .where(self.tables.page.c.type == "p")
+ .subquery(),
},
None,
"page_join",
diff --git a/test/orm/inheritance/test_manytomany.py b/test/orm/inheritance/test_manytomany.py
index dc162321f..207ac09c7 100644
--- a/test/orm/inheritance/test_manytomany.py
+++ b/test/orm/inheritance/test_manytomany.py
@@ -5,11 +5,11 @@ from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.orm import class_mapper
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
class InheritTest(fixtures.MappedTest):
@@ -171,7 +171,7 @@ class InheritTest2(fixtures.MappedTest):
# test that "bar.bid" does not need to be referenced in a get
# (ticket 185)
- assert sess.query(Bar).get(b.id).id == b.id
+ assert sess.get(Bar, b.id).id == b.id
def test_basic(self):
class Foo(object):
diff --git a/test/orm/inheritance/test_poly_linked_list.py b/test/orm/inheritance/test_poly_linked_list.py
index e51026858..91be4d167 100644
--- a/test/orm/inheritance/test_poly_linked_list.py
+++ b/test/orm/inheritance/test_poly_linked_list.py
@@ -4,10 +4,10 @@ from sqlalchemy import String
from sqlalchemy.orm import backref
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import configure_mappers
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
diff --git a/test/orm/inheritance/test_poly_persistence.py b/test/orm/inheritance/test_poly_persistence.py
index 25eeb3d1d..99cab870b 100644
--- a/test/orm/inheritance/test_poly_persistence.py
+++ b/test/orm/inheritance/test_poly_persistence.py
@@ -6,7 +6,6 @@ from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
@@ -15,6 +14,7 @@ from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
@@ -123,7 +123,9 @@ class InsertOrderTest(PolymorphTest):
{
"engineer": people.join(engineers),
"manager": people.join(managers),
- "person": people.select(people.c.type == "person").subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
"pjoin",
@@ -191,7 +193,7 @@ class InsertOrderTest(PolymorphTest):
session.add(c)
session.flush()
session.expunge_all()
- eq_(session.query(Company).get(c.company_id), c)
+ eq_(session.get(Company, c.company_id), c)
@testing.combinations(
@@ -235,9 +237,9 @@ class RoundTripTest(PolymorphTest):
{
"engineer": people.join(engineers),
"manager": people.join(managers),
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
"pjoin",
@@ -399,7 +401,7 @@ class RoundTripTest(PolymorphTest):
employees = session.query(Person).order_by(Person.person_id).all()
company = session.query(Company).first()
- eq_(session.query(Person).get(dilbert.person_id), dilbert)
+ eq_(session.get(Person, dilbert.person_id), dilbert)
session.expunge_all()
eq_(
@@ -411,7 +413,7 @@ class RoundTripTest(PolymorphTest):
session.expunge_all()
def go():
- cc = session.query(Company).get(company.company_id)
+ cc = session.get(Company, company.company_id)
eq_(cc.employees, employees)
if not lazy_relationship:
@@ -471,7 +473,7 @@ class RoundTripTest(PolymorphTest):
# the "palias" alias does *not* get sucked up
# into the "person_join" conversion.
palias = people.alias("palias")
- dilbert = session.query(Person).get(dilbert.person_id)
+ dilbert = session.get(Person, dilbert.person_id)
is_(
dilbert,
session.query(Person)
diff --git a/test/orm/inheritance/test_polymorphic_rel.py b/test/orm/inheritance/test_polymorphic_rel.py
index 86e0bd360..69a485d41 100644
--- a/test/orm/inheritance/test_polymorphic_rel.py
+++ b/test/orm/inheritance/test_polymorphic_rel.py
@@ -5,7 +5,6 @@ from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy import true
from sqlalchemy.orm import aliased
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import defaultload
from sqlalchemy.orm import join
from sqlalchemy.orm import joinedload
@@ -15,6 +14,7 @@ from sqlalchemy.orm import with_polymorphic
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import eq_
from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.fixtures import create_session
from ._poly_fixtures import _Polymorphic
from ._poly_fixtures import _PolymorphicAliasedJoins
from ._poly_fixtures import _PolymorphicJoins
@@ -148,18 +148,18 @@ class _PolymorphicTestBase(object):
self.assert_sql_count(testing.db, go, 3)
eq_(
- select(func.count("*"))
- .select_from(
- sess.query(Person)
- .with_polymorphic("*")
- .options(joinedload(Engineer.machines))
- .order_by(Person.person_id)
- .limit(2)
- .offset(1)
- .with_labels()
- .subquery()
- )
- .scalar(),
+ sess.scalar(
+ select(func.count("*")).select_from(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .options(joinedload(Engineer.machines))
+ .order_by(Person.person_id)
+ .limit(2)
+ .offset(1)
+ .with_labels()
+ .subquery()
+ )
+ ),
2,
)
@@ -170,21 +170,21 @@ class _PolymorphicTestBase(object):
"""
sess = create_session()
eq_(
- sess.query(Person).get(e1.person_id),
+ sess.get(Person, e1.person_id),
Engineer(name="dilbert", primary_language="java"),
)
def test_get_two(self):
sess = create_session()
eq_(
- sess.query(Engineer).get(e1.person_id),
+ sess.get(Engineer, e1.person_id),
Engineer(name="dilbert", primary_language="java"),
)
def test_get_three(self):
sess = create_session()
eq_(
- sess.query(Manager).get(b1.person_id),
+ sess.get(Manager, b1.person_id),
Boss(name="pointy haired boss", golf_swing="fore"),
)
@@ -230,7 +230,7 @@ class _PolymorphicTestBase(object):
)
def test_multi_join_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
e = aliased(Person)
c = aliased(Company)
@@ -283,7 +283,7 @@ class _PolymorphicTestBase(object):
eq_(sess.query(Engineer).all()[0], Engineer(name="dilbert"))
def test_filter_on_subclass_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
eq_(
sess.execute(select(Engineer)).scalar(), Engineer(name="dilbert"),
)
@@ -337,7 +337,7 @@ class _PolymorphicTestBase(object):
)
def test_join_from_polymorphic_nonaliased_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
eq_(
sess.execute(
select(Person)
@@ -396,7 +396,7 @@ class _PolymorphicTestBase(object):
)
def test_join_from_polymorphic_flag_aliased_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
pa = aliased(Paperwork)
eq_(
@@ -496,7 +496,7 @@ class _PolymorphicTestBase(object):
)
def test_join_from_with_polymorphic_nonaliased_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
pm = with_polymorphic(Person, [Manager])
eq_(
@@ -1552,16 +1552,19 @@ class _PolymorphicTestBase(object):
palias = aliased(Person)
expected = [(m1, e1), (m1, e2), (m1, b1)]
- eq_(
- sess.query(Person, palias)
- .filter(Person.company_id == palias.company_id)
- .filter(Person.name == "dogbert")
- .filter(Person.person_id > palias.person_id)
- .from_self()
- .order_by(Person.person_id, palias.person_id)
- .all(),
- expected,
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ sess.query(Person, palias)
+ .filter(Person.company_id == palias.company_id)
+ .filter(Person.name == "dogbert")
+ .filter(Person.person_id > palias.person_id)
+ .from_self()
+ .order_by(Person.person_id, palias.person_id)
+ .all(),
+ expected,
+ )
def test_self_referential_two_point_five(self):
"""Using two aliases, the above case works.
@@ -1572,21 +1575,24 @@ class _PolymorphicTestBase(object):
expected = [(m1, e1), (m1, e2), (m1, b1)]
- eq_(
- sess.query(palias, palias2)
- .filter(palias.company_id == palias2.company_id)
- .filter(palias.name == "dogbert")
- .filter(palias.person_id > palias2.person_id)
- .from_self()
- .order_by(palias.person_id, palias2.person_id)
- .all(),
- expected,
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ sess.query(palias, palias2)
+ .filter(palias.company_id == palias2.company_id)
+ .filter(palias.name == "dogbert")
+ .filter(palias.person_id > palias2.person_id)
+ .from_self()
+ .order_by(palias.person_id, palias2.person_id)
+ .all(),
+ expected,
+ )
def test_self_referential_two_future(self):
# TODO: this is the SECOND test *EVER* of an aliased class of
# an aliased class.
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
expected = [(m1, e1), (m1, e2), (m1, b1)]
# not aliasing the first class
@@ -1615,7 +1621,7 @@ class _PolymorphicTestBase(object):
# TODO: this is the first test *EVER* of an aliased class of
# an aliased class. we should add many more tests for this.
# new case added in Id810f485c5f7ed971529489b84694e02a3356d6d
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
expected = [(m1, e1), (m1, e2), (m1, b1)]
# aliasing the first class
diff --git a/test/orm/inheritance/test_productspec.py b/test/orm/inheritance/test_productspec.py
index aab3570c9..5fd2c5a6f 100644
--- a/test/orm/inheritance/test_productspec.py
+++ b/test/orm/inheritance/test_productspec.py
@@ -7,11 +7,11 @@ from sqlalchemy import Integer
from sqlalchemy import LargeBinary
from sqlalchemy import String
from sqlalchemy.orm import backref
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import deferred
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
diff --git a/test/orm/inheritance/test_relationship.py b/test/orm/inheritance/test_relationship.py
index c7d2042b3..8590949a7 100644
--- a/test/orm/inheritance/test_relationship.py
+++ b/test/orm/inheritance/test_relationship.py
@@ -8,12 +8,12 @@ from sqlalchemy.orm import aliased
from sqlalchemy.orm import backref
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import contains_eager
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
+from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import subqueryload
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.testing import AssertsCompiledSQL
@@ -21,6 +21,7 @@ from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing.entities import ComparableEntity
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -589,10 +590,9 @@ class M2MFilterTest(fixtures.MappedTest):
e4 = Engineer(name="e4")
org1 = Organization(name="org1", engineers=[e1, e2])
org2 = Organization(name="org2", engineers=[e3, e4])
- sess = create_session(connection)
- sess.add(org1)
- sess.add(org2)
- sess.flush()
+ with sessionmaker(connection).begin() as sess:
+ sess.add(org1)
+ sess.add(org2)
def test_not_contains(self):
Organization = self.classes.Organization
@@ -838,9 +838,11 @@ class SelfReferentialM2MTest(fixtures.MappedTest, AssertsCompiledSQL):
# another way to check
eq_(
- select(func.count("*"))
- .select_from(q.limit(1).with_labels().subquery())
- .scalar(),
+ sess.scalar(
+ select(func.count("*")).select_from(
+ q.limit(1).with_labels().subquery()
+ )
+ ),
1,
)
assert q.first() is c1
@@ -1134,15 +1136,16 @@ class SubClassEagerToSubClassTest(fixtures.MappedTest):
global p1, p2
Sub, Subparent = cls.classes.Sub, cls.classes.Subparent
- sess = create_session(connection)
- p1 = Subparent(
- data="p1",
- children=[Sub(data="s1"), Sub(data="s2"), Sub(data="s3")],
- )
- p2 = Subparent(data="p2", children=[Sub(data="s4"), Sub(data="s5")])
- sess.add(p1)
- sess.add(p2)
- sess.flush()
+ with sessionmaker(connection).begin() as sess:
+ p1 = Subparent(
+ data="p1",
+ children=[Sub(data="s1"), Sub(data="s2"), Sub(data="s3")],
+ )
+ p2 = Subparent(
+ data="p2", children=[Sub(data="s4"), Sub(data="s5")]
+ )
+ sess.add(p1)
+ sess.add(p2)
def test_joinedload(self):
Subparent = self.classes.Subparent
@@ -1725,7 +1728,7 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest):
"JOIN ep2 ON base2.id = ep2.base2_id",
)
- def test_six(self):
+ def test_six_legacy(self):
Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
s = Session()
@@ -1735,8 +1738,38 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest):
# this query is coming out instead which is equivalent, but not
# totally sure where this happens
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ self.assert_compile(
+ s.query(Sub2).from_self().join(Sub2.ep1).join(Sub2.ep2),
+ "SELECT anon_1.sub2_id AS anon_1_sub2_id, "
+ "anon_1.base2_base1_id AS anon_1_base2_base1_id, "
+ "anon_1.base2_data AS anon_1_base2_data, "
+ "anon_1.sub2_subdata AS anon_1_sub2_subdata "
+ "FROM (SELECT sub2.id AS sub2_id, base2.id AS base2_id, "
+ "base2.base1_id AS base2_base1_id, base2.data AS base2_data, "
+ "sub2.subdata AS sub2_subdata "
+ "FROM base2 JOIN sub2 ON base2.id = sub2.id) AS anon_1 "
+ "JOIN ep1 ON anon_1.sub2_id = ep1.base2_id "
+ "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id",
+ )
+
+ def test_six(self):
+ Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
+
+ # as of from_self() changing in
+ # I3abfb45dd6e50f84f29d39434caa0b550ce27864,
+ # this query is coming out instead which is equivalent, but not
+ # totally sure where this happens
+
+ stmt = select(Sub2)
+
+ subq = aliased(Sub2, stmt.apply_labels().subquery())
+
+ stmt = select(subq).join(subq.ep1).join(Sub2.ep2).apply_labels()
self.assert_compile(
- s.query(Sub2).from_self().join(Sub2.ep1).join(Sub2.ep2),
+ stmt,
"SELECT anon_1.sub2_id AS anon_1_sub2_id, "
"anon_1.base2_base1_id AS anon_1_base2_base1_id, "
"anon_1.base2_data AS anon_1_base2_data, "
@@ -1749,7 +1782,7 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest):
"JOIN ep2 ON anon_1.sub2_id = ep2.base2_id",
)
- def test_seven(self):
+ def test_seven_legacy(self):
Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
s = Session()
@@ -1758,16 +1791,74 @@ class SubClassToSubClassMultiTest(AssertsCompiledSQL, fixtures.MappedTest):
# I3abfb45dd6e50f84f29d39434caa0b550ce27864,
# this query is coming out instead which is equivalent, but not
# totally sure where this happens
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+
+ self.assert_compile(
+ # adding Sub2 to the entities list helps it,
+ # otherwise the joins for Sub2.ep1/ep2 don't have columns
+ # to latch onto. Can't really make it better than this
+ s.query(Parent, Sub2)
+ .join(Parent.sub1)
+ .join(Sub1.sub2)
+ .from_self()
+ .join(Sub2.ep1)
+ .join(Sub2.ep2),
+ "SELECT anon_1.parent_id AS anon_1_parent_id, "
+ "anon_1.parent_data AS anon_1_parent_data, "
+ "anon_1.sub2_id AS anon_1_sub2_id, "
+ "anon_1.base2_base1_id AS anon_1_base2_base1_id, "
+ "anon_1.base2_data AS anon_1_base2_data, "
+ "anon_1.sub2_subdata AS anon_1_sub2_subdata "
+ "FROM (SELECT parent.id AS parent_id, "
+ "parent.data AS parent_data, "
+ "sub2.id AS sub2_id, "
+ "base2.id AS base2_id, "
+ "base2.base1_id AS base2_base1_id, "
+ "base2.data AS base2_data, "
+ "sub2.subdata AS sub2_subdata "
+ "FROM parent JOIN (base1 JOIN sub1 ON base1.id = sub1.id) "
+ "ON parent.id = sub1.parent_id JOIN "
+ "(base2 JOIN sub2 ON base2.id = sub2.id) "
+ "ON base1.id = base2.base1_id) AS anon_1 "
+ "JOIN ep1 ON anon_1.sub2_id = ep1.base2_id "
+ "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id",
+ )
+
+ def test_seven(self):
+ Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
+
+ # as of from_self() changing in
+ # I3abfb45dd6e50f84f29d39434caa0b550ce27864,
+ # this query is coming out instead which is equivalent, but not
+ # totally sure where this happens
+
+ subq = (
+ select(Parent, Sub2)
+ .join(Parent.sub1)
+ .join(Sub1.sub2)
+ .apply_labels()
+ .subquery()
+ )
+
+ # another 1.4 supercharged select() statement ;)
+
+ palias = aliased(Parent, subq)
+ sub2alias = aliased(Sub2, subq)
+
+ stmt = (
+ select(palias, sub2alias)
+ .join(sub2alias.ep1)
+ .join(sub2alias.ep2)
+ .apply_labels()
+ )
+
self.assert_compile(
# adding Sub2 to the entities list helps it,
# otherwise the joins for Sub2.ep1/ep2 don't have columns
# to latch onto. Can't really make it better than this
- s.query(Parent, Sub2)
- .join(Parent.sub1)
- .join(Sub1.sub2)
- .from_self()
- .join(Sub2.ep1)
- .join(Sub2.ep2),
+ stmt,
"SELECT anon_1.parent_id AS anon_1_parent_id, "
"anon_1.parent_data AS anon_1_parent_data, "
"anon_1.sub2_id AS anon_1_sub2_id, "
diff --git a/test/orm/inheritance/test_single.py b/test/orm/inheritance/test_single.py
index 4adb1a5cb..65dd9c251 100644
--- a/test/orm/inheritance/test_single.py
+++ b/test/orm/inheritance/test_single.py
@@ -12,7 +12,6 @@ from sqlalchemy import testing
from sqlalchemy import true
from sqlalchemy.orm import aliased
from sqlalchemy.orm import Bundle
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
@@ -24,6 +23,7 @@ from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -280,12 +280,53 @@ class SingleInheritanceTest(testing.AssertsCompiledSQL, fixtures.MappedTest):
[e2id],
)
- def test_from_self(self):
+ def test_from_self_legacy(self):
Engineer = self.classes.Engineer
sess = create_session()
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ self.assert_compile(
+ sess.query(Engineer).from_self(),
+ "SELECT anon_1.employees_employee_id AS "
+ "anon_1_employees_employee_id, "
+ "anon_1.employees_name AS "
+ "anon_1_employees_name, "
+ "anon_1.employees_manager_data AS "
+ "anon_1_employees_manager_data, "
+ "anon_1.employees_engineer_info AS "
+ "anon_1_employees_engineer_info, "
+ "anon_1.employees_type AS "
+ "anon_1_employees_type FROM (SELECT "
+ "employees.employee_id AS "
+ "employees_employee_id, employees.name AS "
+ "employees_name, employees.manager_data AS "
+ "employees_manager_data, "
+ "employees.engineer_info AS "
+ "employees_engineer_info, employees.type "
+ "AS employees_type FROM employees WHERE "
+ "employees.type IN ([POSTCOMPILE_type_1])) AS "
+ "anon_1",
+ use_default_dialect=True,
+ )
+
+ def test_from_subq(self):
+ Engineer = self.classes.Engineer
+
+ stmt = select(Engineer)
+ subq = aliased(Engineer, stmt.apply_labels().subquery())
+
+ # so here we have an extra "WHERE type in ()", because
+ # both the inner and the outer queries have the Engineer entity.
+ # this is expected at the moment but it would be nice if
+ # _enable_single_crit or something similar could propagate here.
+ # legacy from_self() takes care of this because it applies
+ # _enable_single_crit at that moment.
+
+ stmt = select(subq).apply_labels()
self.assert_compile(
- sess.query(Engineer).from_self(),
+ stmt,
"SELECT anon_1.employees_employee_id AS "
"anon_1_employees_employee_id, "
"anon_1.employees_name AS "
@@ -304,7 +345,7 @@ class SingleInheritanceTest(testing.AssertsCompiledSQL, fixtures.MappedTest):
"employees_engineer_info, employees.type "
"AS employees_type FROM employees WHERE "
"employees.type IN ([POSTCOMPILE_type_1])) AS "
- "anon_1",
+ "anon_1 WHERE anon_1.employees_type IN ([POSTCOMPILE_type_2])",
use_default_dialect=True,
)
@@ -382,14 +423,17 @@ class SingleInheritanceTest(testing.AssertsCompiledSQL, fixtures.MappedTest):
sess = create_session()
col = func.count(literal_column("*"))
- self.assert_compile(
- sess.query(Engineer.employee_id).from_self(col),
- "SELECT count(*) AS count_1 "
- "FROM (SELECT employees.employee_id AS employees_employee_id "
- "FROM employees "
- "WHERE employees.type IN ([POSTCOMPILE_type_1])) AS anon_1",
- use_default_dialect=True,
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ self.assert_compile(
+ sess.query(Engineer.employee_id).from_self(col),
+ "SELECT count(*) AS count_1 "
+ "FROM (SELECT employees.employee_id AS employees_employee_id "
+ "FROM employees "
+ "WHERE employees.type IN ([POSTCOMPILE_type_1])) AS anon_1",
+ use_default_dialect=True,
+ )
def test_select_from_count(self):
Manager, Engineer = (self.classes.Manager, self.classes.Engineer)
diff --git a/test/orm/test_deprecations.py b/test/orm/test_deprecations.py
index 08dad2e1b..48a8b99a6 100644
--- a/test/orm/test_deprecations.py
+++ b/test/orm/test_deprecations.py
@@ -2134,7 +2134,7 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL):
class DeclarativeBind(fixtures.TestBase):
def test_declarative_base(self):
with testing.expect_deprecated_20(
- 'The "bind" argument to declarative_base is'
+ "The ``bind`` argument to declarative_base is "
"deprecated and will be removed in SQLAlchemy 2.0.",
):
Base = declarative_base(bind=testing.db)
@@ -2143,7 +2143,7 @@ class DeclarativeBind(fixtures.TestBase):
def test_as_declarative(self):
with testing.expect_deprecated_20(
- 'The "bind" argument to declarative_base is'
+ "The ``bind`` argument to as_declarative is "
"deprecated and will be removed in SQLAlchemy 2.0.",
):
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py
index 74fe8876d..4bef1df7f 100644
--- a/test/sql/test_case_statement.py
+++ b/test/sql/test_case_statement.py
@@ -27,7 +27,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
@classmethod
def setup_class(cls):
- metadata = MetaData(testing.db)
+ metadata = MetaData()
global info_table
info_table = Table(
"infos",
@@ -36,24 +36,29 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
Column("info", String(30)),
)
- info_table.create()
-
- info_table.insert().execute(
- {"pk": 1, "info": "pk_1_data"},
- {"pk": 2, "info": "pk_2_data"},
- {"pk": 3, "info": "pk_3_data"},
- {"pk": 4, "info": "pk_4_data"},
- {"pk": 5, "info": "pk_5_data"},
- {"pk": 6, "info": "pk_6_data"},
- )
+ with testing.db.begin() as conn:
+ info_table.create(conn)
+
+ conn.execute(
+ info_table.insert(),
+ [
+ {"pk": 1, "info": "pk_1_data"},
+ {"pk": 2, "info": "pk_2_data"},
+ {"pk": 3, "info": "pk_3_data"},
+ {"pk": 4, "info": "pk_4_data"},
+ {"pk": 5, "info": "pk_5_data"},
+ {"pk": 6, "info": "pk_6_data"},
+ ],
+ )
@classmethod
def teardown_class(cls):
- info_table.drop()
+ with testing.db.begin() as conn:
+ info_table.drop(conn)
@testing.fails_on("firebird", "FIXME: unknown")
@testing.requires.subqueries
- def test_case(self):
+ def test_case(self, connection):
inner = select(
case(
(info_table.c.pk < 3, "lessthan3"),
@@ -63,7 +68,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
info_table.c.info,
).select_from(info_table)
- inner_result = inner.execute().fetchall()
+ inner_result = connection.execute(inner).all()
# Outputs:
# lessthan3 1 pk_1_data
@@ -86,7 +91,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
outer = select(inner.alias("q_inner"))
- outer_result = outer.execute().fetchall()
+ outer_result = connection.execute(outer).all()
assert outer_result == [
("lessthan3", 1, "pk_1_data"),
@@ -107,7 +112,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
info_table.c.info,
).select_from(info_table)
- else_result = w_else.execute().fetchall()
+ else_result = connection.execute(w_else).all()
eq_(
else_result,
@@ -149,7 +154,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
"CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END",
)
- def test_text_doesnt_explode(self):
+ def test_text_doesnt_explode(self, connection):
for s in [
select(
@@ -169,7 +174,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
).order_by(info_table.c.info),
]:
eq_(
- s.execute().fetchall(),
+ connection.execute(s).all(),
[("no",), ("no",), ("no",), ("yes",), ("no",), ("no",)],
)
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index f9bec7914..6cb1c3841 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -351,9 +351,9 @@ class DefaultObjectTest(fixtures.TestBase):
assert_raises_message(
sa.exc.ArgumentError,
r"SQL expression for WHERE/HAVING role expected, "
- r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
- t.select,
- [const],
+ r"got (?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)",
+ t.select().where,
+ const,
)
assert_raises_message(
sa.exc.ArgumentError,
@@ -1006,29 +1006,31 @@ class PKIncrementTest(fixtures.TablesTest):
# TODO: add coverage for increment on a secondary column in a key
@testing.fails_on("firebird", "Data type unknown")
- def _test_autoincrement(self, bind):
+ def _test_autoincrement(self, connection):
aitable = self.tables.aitable
ids = set()
- rs = bind.execute(aitable.insert(), int1=1)
+ rs = connection.execute(aitable.insert(), int1=1)
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(), str1="row 2")
+ rs = connection.execute(aitable.insert(), str1="row 2")
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(), int1=3, str1="row 3")
+ rs = connection.execute(aitable.insert(), int1=3, str1="row 3")
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(values={"int1": func.length("four")}))
+ rs = connection.execute(
+ aitable.insert().values({"int1": func.length("four")})
+ )
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
@@ -1045,7 +1047,7 @@ class PKIncrementTest(fixtures.TablesTest):
)
eq_(
- list(bind.execute(aitable.select().order_by(aitable.c.id))),
+ list(connection.execute(aitable.select().order_by(aitable.c.id))),
[
(testing.db.dialect.default_sequence_base, 1, None),
(testing.db.dialect.default_sequence_base + 1, None, "row 2"),
@@ -1055,7 +1057,8 @@ class PKIncrementTest(fixtures.TablesTest):
)
def test_autoincrement_autocommit(self):
- self._test_autoincrement(testing.db)
+ with testing.db.connect() as conn:
+ self._test_autoincrement(conn)
def test_autoincrement_transaction(self):
with testing.db.begin() as conn:
diff --git a/test/sql/test_delete.py b/test/sql/test_delete.py
index 2e3ba4245..934022560 100644
--- a/test/sql/test_delete.py
+++ b/test/sql/test_delete.py
@@ -57,7 +57,7 @@ class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- delete(table1, table1.c.myid == 7),
+ delete(table1).where(table1.c.myid == 7),
"DELETE FROM mytable WHERE mytable.myid = :myid_1",
)
@@ -118,7 +118,7 @@ class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test a non-correlated WHERE clause
s = select(table2.c.othername).where(table2.c.otherid == 7)
self.assert_compile(
- delete(table1, table1.c.name == s.scalar_subquery()),
+ delete(table1).where(table1.c.name == s.scalar_subquery()),
"DELETE FROM mytable "
"WHERE mytable.name = ("
"SELECT myothertable.othername "
@@ -133,7 +133,7 @@ class DeleteTest(_DeleteTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test one that is actually correlated...
s = select(table2.c.othername).where(table2.c.otherid == table1.c.myid)
self.assert_compile(
- table1.delete(table1.c.name == s.scalar_subquery()),
+ table1.delete().where(table1.c.name == s.scalar_subquery()),
"DELETE FROM mytable "
"WHERE mytable.name = ("
"SELECT myothertable.othername "
diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py
index a72974dbb..30e4338b5 100644
--- a/test/sql/test_deprecations.py
+++ b/test/sql/test_deprecations.py
@@ -1,5 +1,8 @@
#! coding: utf-8
+import itertools
+import random
+
from sqlalchemy import alias
from sqlalchemy import and_
from sqlalchemy import bindparam
@@ -28,9 +31,11 @@ from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
from sqlalchemy.sql import coercions
+from sqlalchemy.sql import literal
from sqlalchemy.sql import operators
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import roles
+from sqlalchemy.sql import update
from sqlalchemy.sql import visitors
from sqlalchemy.sql.selectable import SelectStatementGrouping
from sqlalchemy.testing import assert_raises
@@ -46,6 +51,7 @@ from sqlalchemy.testing import mock
from sqlalchemy.testing import not_in
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+from .test_update import _UpdateFromTestBase
class ToMetaDataTest(fixtures.TestBase):
@@ -1675,7 +1681,7 @@ class DefaultTest(fixtures.TestBase):
eq_(conn.execute(select(table)).first(), (5, 12))
-class DMLTest(fixtures.TestBase, AssertsCompiledSQL):
+class DMLTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
__dialect__ = "default"
def test_insert_inline_kw_defaults(self):
@@ -1770,6 +1776,259 @@ class DMLTest(fixtures.TestBase, AssertsCompiledSQL):
stmt, "UPDATE foo SET bar=%s LIMIT 10", dialect="mysql"
)
+ def test_update_whereclause(self):
+ table1 = table(
+ "mytable", Column("myid", Integer), Column("name", String(30)),
+ )
+
+ with testing.expect_deprecated_20(
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.update(table1.c.myid == 7),
+ "UPDATE mytable SET myid=:myid, name=:name "
+ "WHERE mytable.myid = :myid_1",
+ )
+
+ def test_update_values(self):
+ table1 = table(
+ "mytable", Column("myid", Integer), Column("name", String(30)),
+ )
+
+ with testing.expect_deprecated_20(
+ "The update.values parameter will be removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.update(values={table1.c.myid: 7}),
+ "UPDATE mytable SET myid=:myid",
+ )
+
+ def test_delete_whereclause(self):
+ table1 = table("mytable", Column("myid", Integer),)
+
+ with testing.expect_deprecated_20(
+ "The delete.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.delete(table1.c.myid == 7),
+ "DELETE FROM mytable WHERE mytable.myid = :myid_1",
+ )
+
+ def test_update_ordered_parameters_fire_onupdate(self):
+ table = self.tables.update_w_default
+
+ values = [(table.c.y, table.c.x + 5), ("x", 10)]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ self.assert_compile(
+ table.update(preserve_parameter_order=True).values(values),
+ "UPDATE update_w_default "
+ "SET ycol=(update_w_default.x + :x_1), "
+ "x=:x, data=:data",
+ )
+
+ def test_update_ordered_parameters_override_onupdate(self):
+ table = self.tables.update_w_default
+
+ values = [
+ (table.c.y, table.c.x + 5),
+ (table.c.data, table.c.x + 10),
+ ("x", 10),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ self.assert_compile(
+ table.update(preserve_parameter_order=True).values(values),
+ "UPDATE update_w_default "
+ "SET ycol=(update_w_default.x + :x_1), "
+ "data=(update_w_default.x + :x_2), x=:x",
+ )
+
+ def test_update_ordered_parameters_oldstyle_1(self):
+ table1 = self.tables.mytable
+
+ # Confirm that we can pass values as list value pairs
+ # note these are ordered *differently* from table.c
+ values = [
+ (table1.c.name, table1.c.name + "lala"),
+ (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0.",
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0",
+ "The update.values parameter will be removed in SQLAlchemy 2.0",
+ ):
+ self.assert_compile(
+ update(
+ table1,
+ (table1.c.myid == func.hoho(4))
+ & (
+ table1.c.name
+ == literal("foo") + table1.c.name + literal("lala")
+ ),
+ preserve_parameter_order=True,
+ values=values,
+ ),
+ "UPDATE mytable "
+ "SET "
+ "name=(mytable.name || :name_1), "
+ "myid=do_stuff(mytable.myid, :param_1) "
+ "WHERE "
+ "mytable.myid = hoho(:hoho_1) AND "
+ "mytable.name = :param_2 || mytable.name || :param_3",
+ )
+
+ def test_update_ordered_parameters_oldstyle_2(self):
+ table1 = self.tables.mytable
+
+ # Confirm that we can pass values as list value pairs
+ # note these are ordered *differently* from table.c
+ values = [
+ (table1.c.name, table1.c.name + "lala"),
+ ("description", "some desc"),
+ (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0.",
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0",
+ ):
+ self.assert_compile(
+ update(
+ table1,
+ (table1.c.myid == func.hoho(4))
+ & (
+ table1.c.name
+ == literal("foo") + table1.c.name + literal("lala")
+ ),
+ preserve_parameter_order=True,
+ ).values(values),
+ "UPDATE mytable "
+ "SET "
+ "name=(mytable.name || :name_1), "
+ "description=:description, "
+ "myid=do_stuff(mytable.myid, :param_1) "
+ "WHERE "
+ "mytable.myid = hoho(:hoho_1) AND "
+ "mytable.name = :param_2 || mytable.name || :param_3",
+ )
+
+ def test_update_preserve_order_reqs_listtups(self):
+ table1 = self.tables.mytable
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ testing.assert_raises_message(
+ ValueError,
+ r"When preserve_parameter_order is True, values\(\) "
+ r"only accepts a list of 2-tuples",
+ table1.update(preserve_parameter_order=True).values,
+ {"description": "foo", "name": "bar"},
+ )
+
+ @testing.fixture
+ def randomized_param_order_update(self):
+ from sqlalchemy.sql.dml import UpdateDMLState
+
+ super_process_ordered_values = UpdateDMLState._process_ordered_values
+
+ # this fixture is needed for Python 3.6 and above to work around
+ # dictionaries being insert-ordered. in python 2.7 the previous
+ # logic fails pretty easily without this fixture.
+ def _process_ordered_values(self, statement):
+ super_process_ordered_values(self, statement)
+
+ tuples = list(self._dict_parameters.items())
+ random.shuffle(tuples)
+ self._dict_parameters = dict(tuples)
+
+ dialect = default.StrCompileDialect()
+ dialect.paramstyle = "qmark"
+ dialect.positional = True
+
+ with mock.patch.object(
+ UpdateDMLState, "_process_ordered_values", _process_ordered_values
+ ):
+ yield
+
+ def random_update_order_parameters():
+ from sqlalchemy import ARRAY
+
+ t = table(
+ "foo",
+ column("data1", ARRAY(Integer)),
+ column("data2", ARRAY(Integer)),
+ column("data3", ARRAY(Integer)),
+ column("data4", ARRAY(Integer)),
+ )
+
+ idx_to_value = [
+ (t.c.data1, 5, 7),
+ (t.c.data2, 10, 18),
+ (t.c.data3, 8, 4),
+ (t.c.data4, 12, 14),
+ ]
+
+ def combinations():
+ while True:
+ random.shuffle(idx_to_value)
+ yield list(idx_to_value)
+
+ return testing.combinations(
+ *[
+ (t, combination)
+ for i, combination in zip(range(10), combinations())
+ ],
+ argnames="t, idx_to_value"
+ )
+
+ @random_update_order_parameters()
+ def test_update_to_expression_ppo(
+ self, randomized_param_order_update, t, idx_to_value
+ ):
+ dialect = default.StrCompileDialect()
+ dialect.paramstyle = "qmark"
+ dialect.positional = True
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ stmt = t.update(preserve_parameter_order=True).values(
+ [(col[idx], val) for col, idx, val in idx_to_value]
+ )
+
+ self.assert_compile(
+ stmt,
+ "UPDATE foo SET %s"
+ % (
+ ", ".join(
+ "%s[?]=?" % col.key for col, idx, val in idx_to_value
+ )
+ ),
+ dialect=dialect,
+ checkpositional=tuple(
+ itertools.chain.from_iterable(
+ (idx, val) for col, idx, val in idx_to_value
+ )
+ ),
+ )
+
class TableDeprecationTest(fixtures.TestBase):
def test_mustexists(self):
diff --git a/test/sql/test_external_traversal.py b/test/sql/test_external_traversal.py
index c67b45203..970c39cef 100644
--- a/test/sql/test_external_traversal.py
+++ b/test/sql/test_external_traversal.py
@@ -625,7 +625,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
assert sql_util.ClauseAdapter(u).traverse(t1) is u
- def test_binds(self):
+ def test_bindparams(self):
"""test that unique bindparams change their name upon clone()
to prevent conflicts"""
@@ -1513,7 +1513,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
- vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)),
+ vis.traverse(case((t1.c.col1 == 5, t1.c.col2), else_=t1.c.col1)),
"CASE WHEN (t1alias.col1 = :col1_1) THEN "
"t1alias.col2 ELSE t1alias.col1 END",
)
@@ -1523,7 +1523,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
vis.traverse(
- case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1)
+ case((5, t1.c.col2), value=t1.c.col1, else_=t1.c.col1)
),
"CASE t1alias.col1 WHEN :param_1 THEN "
"t1alias.col2 ELSE t1alias.col1 END",
@@ -2152,7 +2152,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
"VALUES (:col1, :col2, :col3)",
)
- i2 = t1.insert(prefixes=["squiznart"])
+ i2 = t1.insert().prefix_with("squiznart")
self.assert_compile(
i2,
"INSERT squiznart INTO table1 (col1, col2, col3) "
@@ -2223,7 +2223,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_inline_values_single(self):
- i = t1.insert(values={"col1": 5})
+ i = t1.insert().values({"col1": 5})
compile_state = i._compile_state_factory(i, None)
@@ -2231,7 +2231,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
is_(compile_state._has_multi_parameters, False)
def test_inline_values_multi(self):
- i = t1.insert(values=[{"col1": 5}, {"col1": 6}])
+ i = t1.insert().values([{"col1": 5}, {"col1": 6}])
compile_state = i._compile_state_factory(i, None)
@@ -2354,7 +2354,7 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
)
def test_update_no_support_multi_constructor(self):
- stmt = t1.update(values=[{"col1": 5}, {"col1": 7}])
+ stmt = t1.update().values([{"col1": 5}, {"col1": 7}])
assert_raises_message(
exc.InvalidRequestError,
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index 73954a8af..3c6140b81 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -990,30 +990,32 @@ class ExecuteTest(fixtures.TestBase):
Column("stuff", String(20), onupdate="thisisstuff"),
)
meta.create_all(connection)
- connection.execute(t.insert(values=dict(value=func.length("one"))))
+ connection.execute(t.insert().values(value=func.length("one")))
eq_(connection.execute(t.select()).first().value, 3)
- connection.execute(t.update(values=dict(value=func.length("asfda"))))
+ connection.execute(t.update().values(value=func.length("asfda")))
eq_(connection.execute(t.select()).first().value, 5)
r = connection.execute(
- t.insert(values=dict(value=func.length("sfsaafsda")))
+ t.insert().values(value=func.length("sfsaafsda"))
)
id_ = r.inserted_primary_key[0]
- eq_(connection.execute(t.select(t.c.id == id_)).first().value, 9)
- connection.execute(t.update(values={t.c.value: func.length("asdf")}))
+ eq_(
+ connection.execute(t.select().where(t.c.id == id_)).first().value,
+ 9,
+ )
+ connection.execute(t.update().values({t.c.value: func.length("asdf")}))
eq_(connection.execute(t.select()).first().value, 4)
connection.execute(t2.insert())
- connection.execute(t2.insert(values=dict(value=func.length("one"))))
+ connection.execute(t2.insert().values(value=func.length("one")))
connection.execute(
- t2.insert(values=dict(value=func.length("asfda") + -19)),
- stuff="hi",
+ t2.insert().values(value=func.length("asfda") + -19), stuff="hi",
)
res = sorted(connection.execute(select(t2.c.value, t2.c.stuff)))
eq_(res, [(-14, "hi"), (3, None), (7, None)])
connection.execute(
- t2.update(values=dict(value=func.length("asdsafasd"))),
+ t2.update().values(value=func.length("asdsafasd")),
stuff="some stuff",
)
eq_(
@@ -1023,23 +1025,18 @@ class ExecuteTest(fixtures.TestBase):
connection.execute(t2.delete())
- connection.execute(
- t2.insert(values=dict(value=func.length("one") + 8))
- )
+ connection.execute(t2.insert().values(value=func.length("one") + 8))
eq_(connection.execute(t2.select()).first().value, 11)
- connection.execute(t2.update(values=dict(value=func.length("asfda"))))
+ connection.execute(t2.update().values(value=func.length("asfda")))
eq_(
connection.execute(select(t2.c.value, t2.c.stuff)).first(),
(5, "thisisstuff"),
)
connection.execute(
- t2.update(
- values={
- t2.c.value: func.length("asfdaasdf"),
- t2.c.stuff: "foo",
- }
+ t2.update().values(
+ {t2.c.value: func.length("asfdaasdf"), t2.c.stuff: "foo"}
)
)
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index 3cdf291ec..541ffc4e9 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -164,7 +164,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams = {"myid": 3, "name": "jack"}
self.assert_compile(
- insert(table1, dict(myid=3, name="jack")),
+ insert(table1).values(myid=3, name="jack"),
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)",
checkparams=checkparams,
)
@@ -194,7 +194,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams = {"myid": 3, "name": "jack", "unknowncol": "oops"}
- stmt = insert(table1, values=checkparams)
+ stmt = insert(table1).values(checkparams)
assert_raises_message(
exc.CompileError,
"Unconsumed column names: unknowncol",
@@ -210,7 +210,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
{"myid": 4, "name": "someone", "unknowncol": "oops"},
]
- stmt = insert(table1, values=checkparams)
+ stmt = insert(table1).values(checkparams)
assert_raises_message(
exc.CompileError,
"Unconsumed column names: unknowncol",
@@ -228,7 +228,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
}
self.assert_compile(
- insert(table1, (3, "jack", "mydescription")),
+ insert(table1).values([3, "jack", "mydescription"]),
"INSERT INTO mytable (myid, name, description) "
"VALUES (:myid, :name, :description)",
checkparams=checkparams,
@@ -238,7 +238,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- insert(table1, values=dict(myid=func.lala())),
+ insert(table1).values(myid=func.lala()),
"INSERT INTO mytable (myid) VALUES (lala())",
)
@@ -251,7 +251,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
}
self.assert_compile(
- insert(table1, values),
+ insert(table1).values(values),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
)
@@ -262,7 +262,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
values2 = {table1.c.name: bindparam("username")}
self.assert_compile(
- insert(table1, values=values1).values(values2),
+ insert(table1).values(values1).values(values2),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
)
diff --git a/test/sql/test_insert_exec.py b/test/sql/test_insert_exec.py
index c4266603e..45a8bccf5 100644
--- a/test/sql/test_insert_exec.py
+++ b/test/sql/test_insert_exec.py
@@ -36,23 +36,29 @@ class InsertExecTest(fixtures.TablesTest):
)
@testing.requires.multivalues_inserts
- def test_multivalues_insert(self):
+ def test_multivalues_insert(self, connection):
users = self.tables.users
- users.insert(
- values=[
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- ]
- ).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ connection.execute(
+ users.insert().values(
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ ]
+ )
+ )
+ rows = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).all()
eq_(rows[0], (7, "jack"))
eq_(rows[1], (8, "ed"))
- users.insert(values=[(9, "jack"), (10, "ed")]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ connection.execute(users.insert().values([(9, "jack"), (10, "ed")]))
+ rows = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).all()
eq_(rows[2], (9, "jack"))
eq_(rows[3], (10, "ed"))
- def test_insert_heterogeneous_params(self):
+ def test_insert_heterogeneous_params(self, connection):
"""test that executemany parameters are asserted to match the
parameter set of the first."""
users = self.tables.users
@@ -63,16 +69,22 @@ class InsertExecTest(fixtures.TablesTest):
"bind parameter 'user_name', in "
"parameter group 2\n"
r"\[SQL: u?INSERT INTO users",
- users.insert().execute,
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9},
+ connection.execute,
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
+ ],
)
# this succeeds however. We aren't yet doing
# a length check on all subsequent parameters.
- users.insert().execute(
- {"user_id": 7}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9}
+ connection.execute(
+ users.insert(),
+ {"user_id": 7},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
)
def _test_lastrow_accessor(self, table_, values, assertvalues):
@@ -94,26 +106,29 @@ class InsertExecTest(fixtures.TablesTest):
):
is_(bool(comp.returning), True)
- result = engine.execute(table_.insert(), **values)
- ret = values.copy()
-
- for col, id_ in zip(
- table_.primary_key, result.inserted_primary_key
- ):
- ret[col.key] = id_
-
- if result.lastrow_has_defaults():
- criterion = and_(
- *[
- col == id_
- for col, id_ in zip(
- table_.primary_key, result.inserted_primary_key
- )
- ]
- )
- row = engine.execute(table_.select(criterion)).first()
- for c in table_.c:
- ret[c.key] = row._mapping[c]
+ with engine.begin() as connection:
+ result = connection.execute(table_.insert(), **values)
+ ret = values.copy()
+
+ for col, id_ in zip(
+ table_.primary_key, result.inserted_primary_key
+ ):
+ ret[col.key] = id_
+
+ if result.lastrow_has_defaults():
+ criterion = and_(
+ *[
+ col == id_
+ for col, id_ in zip(
+ table_.primary_key, result.inserted_primary_key
+ )
+ ]
+ )
+ row = connection.execute(
+ table_.select().where(criterion)
+ ).first()
+ for c in table_.c:
+ ret[c.key] = row._mapping[c]
return ret
if testing.against("firebird", "postgresql", "oracle", "mssql"):
@@ -275,15 +290,16 @@ class InsertExecTest(fixtures.TablesTest):
Column("x", Integer, primary_key=True),
Column("y", Integer),
)
- t.create(eng)
- r = eng.execute(t.insert().values(y=5))
- eq_(r.inserted_primary_key, (0,))
+ with eng.begin() as conn:
+ t.create(conn)
+ r = conn.execute(t.insert().values(y=5))
+ eq_(r.inserted_primary_key, (0,))
@testing.fails_on(
"sqlite", "sqlite autoincrement doesn't work with composite pks"
)
@testing.provide_metadata
- def test_misordered_lastrow(self):
+ def test_misordered_lastrow(self, connection):
metadata = self.metadata
related = Table(
@@ -312,37 +328,39 @@ class InsertExecTest(fixtures.TablesTest):
mariadb_engine="MyISAM",
)
- metadata.create_all()
- r = related.insert().values(id=12).execute()
+ metadata.create_all(connection)
+ r = connection.execute(related.insert().values(id=12))
id_ = r.inserted_primary_key[0]
eq_(id_, 12)
- r = t6.insert().values(manual_id=id_).execute()
+ r = connection.execute(t6.insert().values(manual_id=id_))
eq_(r.inserted_primary_key, (12, 1))
- def test_implicit_id_insert_select_columns(self):
+ def test_implicit_id_insert_select_columns(self, connection):
users = self.tables.users
stmt = users.insert().from_select(
(users.c.user_id, users.c.user_name),
users.select().where(users.c.user_id == 20),
)
- testing.db.execute(stmt)
+ r = connection.execute(stmt)
+ eq_(r.inserted_primary_key, (None,))
- def test_implicit_id_insert_select_keys(self):
+ def test_implicit_id_insert_select_keys(self, connection):
users = self.tables.users
stmt = users.insert().from_select(
["user_id", "user_name"],
users.select().where(users.c.user_id == 20),
)
- testing.db.execute(stmt)
+ r = connection.execute(stmt)
+ eq_(r.inserted_primary_key, (None,))
@testing.requires.empty_inserts
@testing.requires.returning
- def test_no_inserted_pk_on_returning(self):
+ def test_no_inserted_pk_on_returning(self, connection):
users = self.tables.users
- result = testing.db.execute(
+ result = connection.execute(
users.insert().returning(users.c.user_id, users.c.user_name)
)
assert_raises_message(
@@ -399,52 +417,60 @@ class TableInsertTest(fixtures.TablesTest):
return t
def _test(
- self, stmt, row, returning=None, inserted_primary_key=False, table=None
+ self,
+ connection,
+ stmt,
+ row,
+ returning=None,
+ inserted_primary_key=False,
+ table=None,
):
- with testing.db.connect() as conn:
- r = conn.execute(stmt)
+ r = connection.execute(stmt)
- if returning:
- returned = r.first()
- eq_(returned, returning)
- elif inserted_primary_key is not False:
- eq_(r.inserted_primary_key, inserted_primary_key)
+ if returning:
+ returned = r.first()
+ eq_(returned, returning)
+ elif inserted_primary_key is not False:
+ eq_(r.inserted_primary_key, inserted_primary_key)
- if table is None:
- table = self.tables.foo
+ if table is None:
+ table = self.tables.foo
- eq_(conn.execute(table.select()).first(), row)
+ eq_(connection.execute(table.select()).first(), row)
- def _test_multi(self, stmt, rows, data):
- testing.db.execute(stmt, rows)
+ def _test_multi(self, connection, stmt, rows, data):
+ connection.execute(stmt, rows)
eq_(
- testing.db.execute(
+ connection.execute(
self.tables.foo.select().order_by(self.tables.foo.c.id)
- ).fetchall(),
+ ).all(),
data,
)
@testing.requires.sequences
- def test_explicit_sequence(self):
+ def test_explicit_sequence(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(
id=func.next_value(Sequence("t_id_seq")), data="data", x=5
),
(testing.db.dialect.default_sequence_base, "data", 5),
)
- def test_uppercase(self):
+ def test_uppercase(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
)
- def test_uppercase_inline(self):
+ def test_uppercase_inline(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().inline().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
@@ -454,64 +480,71 @@ class TableInsertTest(fixtures.TablesTest):
"mssql+pyodbc",
"Pyodbc + SQL Server + Py3K, some decimal handling issue",
)
- def test_uppercase_inline_implicit(self):
+ def test_uppercase_inline_implicit(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().inline().values(data="data", x=5),
(1, "data", 5),
inserted_primary_key=(None,),
)
- def test_uppercase_implicit(self):
+ def test_uppercase_implicit(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(testing.db.dialect.default_sequence_base,),
)
- def test_uppercase_direct_params(self):
+ def test_uppercase_direct_params(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
)
@testing.requires.returning
- def test_uppercase_direct_params_returning(self):
+ def test_uppercase_direct_params_returning(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
(1, "data", 5),
returning=(1, 5),
)
@testing.requires.sql_expressions_inserted_as_primary_key
- def test_sql_expr_lastrowid(self):
+ def test_sql_expr_lastrowid(self, connection):
# see also test.orm.test_unitofwork.py
# ClauseAttributesTest.test_insert_pk_expression
t = self.tables.foo_no_seq
self._test(
+ connection,
t.insert().values(id=literal(5) + 10, data="data", x=5),
(15, "data", 5),
inserted_primary_key=(15,),
table=self.tables.foo_no_seq,
)
- def test_direct_params(self):
+ def test_direct_params(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(),
)
@testing.requires.returning
- def test_direct_params_returning(self):
+ def test_direct_params_returning(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
(testing.db.dialect.default_sequence_base, "data", 5),
returning=(testing.db.dialect.default_sequence_base, 5),
@@ -523,9 +556,10 @@ class TableInsertTest(fixtures.TablesTest):
# does not indicate the Sequence
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk(self):
+ def test_implicit_pk(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(),
@@ -533,9 +567,10 @@ class TableInsertTest(fixtures.TablesTest):
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk_multi_rows(self):
+ def test_implicit_pk_multi_rows(self, connection):
t = self._fixture()
self._test_multi(
+ connection,
t.insert(),
[
{"data": "d1", "x": 5},
@@ -547,9 +582,10 @@ class TableInsertTest(fixtures.TablesTest):
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk_inline(self):
+ def test_implicit_pk_inline(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().inline().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(),
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index 731f8fcb5..e3fdff806 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -273,15 +273,20 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
# version) generate a subquery for limits/offsets. ensure that the
# generated result map corresponds to the selected table, not the
# select query
- s = table1.select(
- use_labels=True, order_by=[table1.c.this_is_the_primarykey_column]
- ).limit(2)
+ s = (
+ table1.select()
+ .apply_labels()
+ .order_by(table1.c.this_is_the_primarykey_column)
+ .limit(2)
+ )
self._assert_labeled_table1_select(s)
def test_result_map_subquery(self):
table1 = self.table1
- s = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ s = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
s2 = select(s)
compiled = s2.compile(dialect=self._length_fixture())
@@ -302,7 +307,11 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
table1 = self.table1
dialect = self._length_fixture()
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
s = select(q).apply_labels()
self.assert_compile(
@@ -348,7 +357,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
def test_column_bind_labels_1(self):
table1 = self.table1
- s = table1.select(table1.c.this_is_the_primarykey_column == 4)
+ s = table1.select().where(table1.c.this_is_the_primarykey_column == 4)
self.assert_compile(
s,
"SELECT some_large_named_table.this_is_the_primarykey_column, "
@@ -375,7 +384,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL):
def test_column_bind_labels_2(self):
table1 = self.table1
- s = table1.select(
+ s = table1.select().where(
or_(
table1.c.this_is_the_primarykey_column == 4,
table1.c.this_is_the_primarykey_column == 2,
@@ -473,8 +482,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_1(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
compile_dialect = default.DefaultDialect(label_length=10)
@@ -501,8 +512,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_2(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
@@ -531,8 +544,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
table1 = self.table1
compile_dialect = default.DefaultDialect(label_length=4)
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
@@ -559,7 +574,11 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_4(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
x = select(q).apply_labels()
compile_dialect = default.DefaultDialect(label_length=10)
@@ -586,7 +605,11 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_5(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
x = select(q).apply_labels()
compile_dialect = default.DefaultDialect(label_length=4)
@@ -615,7 +638,8 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
table1 = self.table1
q = (
- table1.select(table1.c.this_is_the_primarykey_column == 4)
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
.apply_labels()
.alias("foo")
)
@@ -642,8 +666,10 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
def test_adjustable_result_schema_column_2(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 2fb50f37d..26d4969c8 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -370,7 +370,7 @@ class ReturnDefaultsTest(fixtures.TablesTest):
def test_arg_insert_pk(self, connection):
t1 = self.tables.t1
result = connection.execute(
- t1.insert(return_defaults=[t1.c.insdef]).values(upddef=1)
+ t1.insert().return_defaults(t1.c.insdef).values(upddef=1)
)
eq_(
[
@@ -394,7 +394,7 @@ class ReturnDefaultsTest(fixtures.TablesTest):
t1 = self.tables.t1
connection.execute(t1.insert().values(upddef=1))
result = connection.execute(
- t1.update(return_defaults=[t1.c.upddef]).values(data="d1")
+ t1.update().return_defaults(t1.c.upddef).values(data="d1")
)
eq_(
[result.returned_defaults._mapping[k] for k in (t1.c.upddef,)], [1]
diff --git a/test/sql/test_update.py b/test/sql/test_update.py
index a5e57a78a..8be5868db 100644
--- a/test/sql/test_update.py
+++ b/test/sql/test_update.py
@@ -195,13 +195,12 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
# test against a straight text subquery
- u = update(
- table1,
- values={
+ u = update(table1).values(
+ {
table1.c.name: text(
"(select name from mytable where id=mytable.id)"
)
- },
+ }
)
self.assert_compile(
u,
@@ -213,13 +212,12 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
mt = table1.alias()
- u = update(
- table1,
- values={
+ u = update(table1).values(
+ {
table1.c.name: select(mt.c.name)
.where(mt.c.myid == table1.c.myid)
.scalar_subquery()
- },
+ }
)
self.assert_compile(
u,
@@ -238,7 +236,11 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
.where(table2.c.otherid == table1.c.myid)
.scalar_subquery()
)
- u = update(table1, table1.c.name == "jack", values={table1.c.name: s})
+ u = (
+ update(table1)
+ .where(table1.c.name == "jack")
+ .values({table1.c.name: s})
+ )
self.assert_compile(
u,
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
@@ -253,7 +255,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test a non-correlated WHERE clause
s = select(table2.c.othername).where(table2.c.otherid == 7)
- u = update(table1, table1.c.name == s.scalar_subquery())
+ u = update(table1).where(table1.c.name == s.scalar_subquery())
self.assert_compile(
u,
"UPDATE mytable SET myid=:myid, name=:name, "
@@ -268,7 +270,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
# test one that is actually correlated...
s = select(table2.c.othername).where(table2.c.otherid == table1.c.myid)
- u = table1.update(table1.c.name == s.scalar_subquery())
+ u = table1.update().where(table1.c.name == s.scalar_subquery())
self.assert_compile(
u,
"UPDATE mytable SET myid=:myid, name=:name, "
@@ -420,7 +422,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 7),
+ update(table1).where(table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params={table1.c.name: "fred"},
)
@@ -440,7 +442,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 7),
+ update(table1).where(table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params={"name": "fred"},
)
@@ -449,7 +451,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, values={table1.c.name: table1.c.myid}),
+ update(table1).values({table1.c.name: table1.c.myid}),
"UPDATE mytable SET name=mytable.myid",
)
@@ -457,11 +459,9 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(
- table1,
- whereclause=table1.c.name == bindparam("crit"),
- values={table1.c.name: "hi"},
- ),
+ update(table1)
+ .where(table1.c.name == bindparam("crit"))
+ .values({table1.c.name: "hi"},),
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
params={"crit": "notthere"},
checkparams={"crit": "notthere", "name": "hi"},
@@ -471,11 +471,9 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(
- table1,
- table1.c.myid == 12,
- values={table1.c.name: table1.c.myid},
- ),
+ update(table1)
+ .where(table1.c.myid == 12)
+ .values({table1.c.name: table1.c.myid},),
"UPDATE mytable "
"SET name=mytable.myid, description=:description "
"WHERE mytable.myid = :myid_1",
@@ -487,7 +485,9 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
+ update(table1)
+ .where(table1.c.myid == 12)
+ .values({table1.c.myid: 9}),
"UPDATE mytable "
"SET myid=:myid, description=:description "
"WHERE mytable.myid = :myid_1",
@@ -498,7 +498,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 12),
+ update(table1).where(table1.c.myid == 12),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
params={"myid": 18},
checkparams={"myid": 18, "myid_1": 12},
@@ -507,7 +507,11 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
def test_update_9(self):
table1 = self.tables.mytable
- s = table1.update(table1.c.myid == 12, values={table1.c.name: "lala"})
+ s = (
+ table1.update()
+ .where(table1.c.myid == 12)
+ .values({table1.c.name: "lala"})
+ )
c = s.compile(column_keys=["id", "name"])
eq_(str(s), str(c))
@@ -517,7 +521,7 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
v1 = {table1.c.name: table1.c.myid}
v2 = {table1.c.name: table1.c.name + "foo"}
self.assert_compile(
- update(table1, table1.c.myid == 12, values=v1).values(v2),
+ update(table1).where(table1.c.myid == 12).values(v1).values(v2),
"UPDATE mytable "
"SET "
"name=(mytable.name || :name_1), "
@@ -535,15 +539,15 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
}
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
- ),
- values=values,
- ),
+ )
+ )
+ .values(values),
"UPDATE mytable "
"SET "
"myid=do_stuff(mytable.myid, :param_1), "
@@ -586,35 +590,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
column_keys=["j"],
)
- def test_update_ordered_parameters_oldstyle_1(self):
- table1 = self.tables.mytable
-
- # Confirm that we can pass values as list value pairs
- # note these are ordered *differently* from table.c
- values = [
- (table1.c.name, table1.c.name + "lala"),
- (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
- ]
- self.assert_compile(
- update(
- table1,
- (table1.c.myid == func.hoho(4))
- & (
- table1.c.name
- == literal("foo") + table1.c.name + literal("lala")
- ),
- preserve_parameter_order=True,
- values=values,
- ),
- "UPDATE mytable "
- "SET "
- "name=(mytable.name || :name_1), "
- "myid=do_stuff(mytable.myid, :param_1) "
- "WHERE "
- "mytable.myid = hoho(:hoho_1) AND "
- "mytable.name = :param_2 || mytable.name || :param_3",
- )
-
def test_update_ordered_parameters_newstyle_1(self):
table1 = self.tables.mytable
@@ -643,36 +618,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
"mytable.name = :param_2 || mytable.name || :param_3",
)
- def test_update_ordered_parameters_oldstyle_2(self):
- table1 = self.tables.mytable
-
- # Confirm that we can pass values as list value pairs
- # note these are ordered *differently* from table.c
- values = [
- (table1.c.name, table1.c.name + "lala"),
- ("description", "some desc"),
- (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
- ]
- self.assert_compile(
- update(
- table1,
- (table1.c.myid == func.hoho(4))
- & (
- table1.c.name
- == literal("foo") + table1.c.name + literal("lala")
- ),
- preserve_parameter_order=True,
- ).values(values),
- "UPDATE mytable "
- "SET "
- "name=(mytable.name || :name_1), "
- "description=:description, "
- "myid=do_stuff(mytable.myid, :param_1) "
- "WHERE "
- "mytable.myid = hoho(:hoho_1) AND "
- "mytable.name = :param_2 || mytable.name || :param_3",
- )
-
def test_update_ordered_parameters_newstyle_2(self):
table1 = self.tables.mytable
@@ -684,14 +629,15 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
(table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
]
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
),
- ).ordered_values(*values),
+ )
+ .ordered_values(*values),
"UPDATE mytable "
"SET "
"name=(mytable.name || :name_1), "
@@ -741,42 +687,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
stmt.compile,
)
- def test_update_ordered_parameters_fire_onupdate(self):
- table = self.tables.update_w_default
-
- values = [(table.c.y, table.c.x + 5), ("x", 10)]
-
- self.assert_compile(
- table.update(preserve_parameter_order=True).values(values),
- "UPDATE update_w_default SET ycol=(update_w_default.x + :x_1), "
- "x=:x, data=:data",
- )
-
- def test_update_ordered_parameters_override_onupdate(self):
- table = self.tables.update_w_default
-
- values = [
- (table.c.y, table.c.x + 5),
- (table.c.data, table.c.x + 10),
- ("x", 10),
- ]
-
- self.assert_compile(
- table.update(preserve_parameter_order=True).values(values),
- "UPDATE update_w_default SET ycol=(update_w_default.x + :x_1), "
- "data=(update_w_default.x + :x_2), x=:x",
- )
-
- def test_update_preserve_order_reqs_listtups(self):
- table1 = self.tables.mytable
- testing.assert_raises_message(
- ValueError,
- r"When preserve_parameter_order is True, values\(\) "
- r"only accepts a list of 2-tuples",
- table1.update(preserve_parameter_order=True).values,
- {"description": "foo", "name": "bar"},
- )
-
def test_update_ordereddict(self):
table1 = self.tables.mytable
@@ -790,15 +700,15 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
)
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
),
- values=values,
- ),
+ )
+ .values(values),
"UPDATE mytable "
"SET "
"myid=do_stuff(mytable.myid, :param_1), "
@@ -954,35 +864,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
),
)
- @random_update_order_parameters()
- def test_update_to_expression_ppo(
- self, randomized_param_order_update, t, idx_to_value
- ):
- dialect = default.StrCompileDialect()
- dialect.paramstyle = "qmark"
- dialect.positional = True
-
- # deprecated pattern here
- stmt = t.update(preserve_parameter_order=True).values(
- [(col[idx], val) for col, idx, val in idx_to_value]
- )
-
- self.assert_compile(
- stmt,
- "UPDATE foo SET %s"
- % (
- ", ".join(
- "%s[?]=?" % col.key for col, idx, val in idx_to_value
- )
- ),
- dialect=dialect,
- checkpositional=tuple(
- itertools.chain.from_iterable(
- (idx, val) for col, idx, val in idx_to_value
- )
- ),
- )
-
def test_update_to_expression_three(self):
# this test is from test_defaults but exercises a particular
# parameter ordering issue
@@ -1076,9 +957,9 @@ class UpdateFromCompileTest(
# against the alias, but we name the table-bound column
# in values. The behavior here isn't really defined
self.assert_compile(
- update(talias1, talias1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(talias1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1 "
"SET name=:name "
"WHERE t1.myid = :myid_1",
@@ -1093,9 +974,9 @@ class UpdateFromCompileTest(
# which is causing the "table1.c.name" param to be handled
# as an "extra table", hence we see the full table name rendered.
self.assert_compile(
- update(talias1, table1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(table1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1 "
"SET name=:mytable_name "
"FROM mytable "
@@ -1108,9 +989,9 @@ class UpdateFromCompileTest(
talias1 = table1.alias("t1")
self.assert_compile(
- update(talias1, table1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(table1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1, mytable SET mytable.name=%s "
"WHERE mytable.myid = %s",
checkparams={"mytable_name": "fred", "myid_1": 7},
@@ -1212,9 +1093,13 @@ class UpdateFromCompileTest(
checkparams = {"email_address_1": "e1", "id_1": 7, "name": "newname"}
- cols = [addresses.c.id, addresses.c.user_id, addresses.c.email_address]
-
- subq = select(cols).where(addresses.c.id == 7).alias()
+ subq = (
+ select(
+ addresses.c.id, addresses.c.user_id, addresses.c.email_address
+ )
+ .where(addresses.c.id == 7)
+ .alias()
+ )
self.assert_compile(
users.update()
.values(name="newname")