summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-04-28 18:31:51 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2021-04-29 14:43:09 -0400
commitaba308868544b21bafa0b3435701ddc908654b0a (patch)
tree9160bdeacf66b4227e73203f7bb81a074d463927 /lib/sqlalchemy/testing
parent5b12393e81f6b8953e9ebd46801e6943007b7a56 (diff)
downloadsqlalchemy-aba308868544b21bafa0b3435701ddc908654b0a.tar.gz
Use non-subquery form for empty IN
Revised the "EMPTY IN" expression to no longer rely upon using a subquery, as this was causing some compatibility and performance problems. The new approach for selected databases takes advantage of using a NULL-returning IN expression combined with the usual "1 != 1" or "1 = 1" expression appended by AND or OR. The expression is now the default for all backends other than SQLite, which still had some compatibility issues regarding tuple "IN" for older SQLite versions. Third party dialects can still override how the "empty set" expression renders by implementing a new compiler method ``def visit_empty_set_op_expr(self, type_, expand_op)``, which takes precedence over the existing ``def visit_empty_set_expr(self, element_types)`` which remains in place. Fixes: #6258 Fixes: #6397 Change-Id: I2df09eb00d2ad3b57039ae48128fdf94641b5e59
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r--lib/sqlalchemy/testing/suite/test_select.py187
1 files changed, 135 insertions, 52 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_select.py b/lib/sqlalchemy/testing/suite/test_select.py
index 7b35dc3fa..1614acd3d 100644
--- a/lib/sqlalchemy/testing/suite/test_select.py
+++ b/lib/sqlalchemy/testing/suite/test_select.py
@@ -1016,163 +1016,246 @@ class ExpandingBoundInTest(fixtures.TablesTest):
with config.db.connect() as conn:
eq_(conn.execute(select, params).fetchall(), result)
- def test_multiple_empty_sets(self):
+ def test_multiple_empty_sets_bindparam(self):
# test that any anonymous aliasing used by the dialect
# is fine with duplicates
table = self.tables.some_table
-
stmt = (
select(table.c.id)
- .where(table.c.x.in_(bindparam("q", expanding=True)))
- .where(table.c.y.in_(bindparam("p", expanding=True)))
+ .where(table.c.x.in_(bindparam("q")))
+ .where(table.c.y.in_(bindparam("p")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [], params={"q": [], "p": []})
- @testing.requires.tuple_in_w_empty
- def test_empty_heterogeneous_tuples(self):
+ def test_multiple_empty_sets_direct(self):
+ # test that any anonymous aliasing used by the dialect
+ # is fine with duplicates
table = self.tables.some_table
-
stmt = (
select(table.c.id)
- .where(
- tuple_(table.c.x, table.c.z).in_(
- bindparam("q", expanding=True)
- )
- )
+ .where(table.c.x.in_([]))
+ .where(table.c.y.in_([]))
.order_by(table.c.id)
)
+ self._assert_result(stmt, [])
+ @testing.requires.tuple_in_w_empty
+ def test_empty_heterogeneous_tuples_bindparam(self):
+ table = self.tables.some_table
+ stmt = (
+ select(table.c.id)
+ .where(tuple_(table.c.x, table.c.z).in_(bindparam("q")))
+ .order_by(table.c.id)
+ )
self._assert_result(stmt, [], params={"q": []})
@testing.requires.tuple_in_w_empty
- def test_empty_homogeneous_tuples(self):
+ def test_empty_heterogeneous_tuples_direct(self):
table = self.tables.some_table
+ def go(val, expected):
+ stmt = (
+ select(table.c.id)
+ .where(tuple_(table.c.x, table.c.z).in_(val))
+ .order_by(table.c.id)
+ )
+ self._assert_result(stmt, expected)
+
+ go([], [])
+ go([(2, "z2"), (3, "z3"), (4, "z4")], [(2,), (3,), (4,)])
+ go([], [])
+
+ @testing.requires.tuple_in_w_empty
+ def test_empty_homogeneous_tuples_bindparam(self):
+ table = self.tables.some_table
stmt = (
select(table.c.id)
- .where(
- tuple_(table.c.x, table.c.y).in_(
- bindparam("q", expanding=True)
- )
- )
+ .where(tuple_(table.c.x, table.c.y).in_(bindparam("q")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [], params={"q": []})
- def test_bound_in_scalar(self):
+ @testing.requires.tuple_in_w_empty
+ def test_empty_homogeneous_tuples_direct(self):
table = self.tables.some_table
+ def go(val, expected):
+ stmt = (
+ select(table.c.id)
+ .where(tuple_(table.c.x, table.c.y).in_(val))
+ .order_by(table.c.id)
+ )
+ self._assert_result(stmt, expected)
+
+ go([], [])
+ go([(1, 2), (2, 3), (3, 4)], [(1,), (2,), (3,)])
+ go([], [])
+
+ def test_bound_in_scalar_bindparam(self):
+ table = self.tables.some_table
stmt = (
select(table.c.id)
- .where(table.c.x.in_(bindparam("q", expanding=True)))
+ .where(table.c.x.in_(bindparam("q")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [(2,), (3,), (4,)], params={"q": [2, 3, 4]})
- @testing.requires.tuple_in
- def test_bound_in_two_tuple(self):
+ def test_bound_in_scalar_direct(self):
table = self.tables.some_table
-
stmt = (
select(table.c.id)
- .where(
- tuple_(table.c.x, table.c.y).in_(
- bindparam("q", expanding=True)
- )
- )
+ .where(table.c.x.in_([2, 3, 4]))
.order_by(table.c.id)
)
+ self._assert_result(stmt, [(2,), (3,), (4,)])
+ @testing.requires.tuple_in
+ def test_bound_in_two_tuple_bindparam(self):
+ table = self.tables.some_table
+ stmt = (
+ select(table.c.id)
+ .where(tuple_(table.c.x, table.c.y).in_(bindparam("q")))
+ .order_by(table.c.id)
+ )
self._assert_result(
stmt, [(2,), (3,), (4,)], params={"q": [(2, 3), (3, 4), (4, 5)]}
)
@testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple(self):
+ def test_bound_in_two_tuple_direct(self):
+ table = self.tables.some_table
+ stmt = (
+ select(table.c.id)
+ .where(tuple_(table.c.x, table.c.y).in_([(2, 3), (3, 4), (4, 5)]))
+ .order_by(table.c.id)
+ )
+ self._assert_result(stmt, [(2,), (3,), (4,)])
+
+ @testing.requires.tuple_in
+ def test_bound_in_heterogeneous_two_tuple_bindparam(self):
table = self.tables.some_table
+ stmt = (
+ select(table.c.id)
+ .where(tuple_(table.c.x, table.c.z).in_(bindparam("q")))
+ .order_by(table.c.id)
+ )
+ self._assert_result(
+ stmt,
+ [(2,), (3,), (4,)],
+ params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
+ )
+ @testing.requires.tuple_in
+ def test_bound_in_heterogeneous_two_tuple_direct(self):
+ table = self.tables.some_table
stmt = (
select(table.c.id)
.where(
tuple_(table.c.x, table.c.z).in_(
- bindparam("q", expanding=True)
+ [(2, "z2"), (3, "z3"), (4, "z4")]
)
)
.order_by(table.c.id)
)
-
self._assert_result(
stmt,
[(2,), (3,), (4,)],
- params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
@testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple_text(self):
+ def test_bound_in_heterogeneous_two_tuple_text_bindparam(self):
+ # note this becomes ARRAY if we dont use expanding
+ # explicitly right now
stmt = text(
"select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
).bindparams(bindparam("q", expanding=True))
-
self._assert_result(
stmt,
[(2,), (3,), (4,)],
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
- def test_empty_set_against_integer(self):
+ def test_empty_set_against_integer_bindparam(self):
table = self.tables.some_table
-
stmt = (
select(table.c.id)
- .where(table.c.x.in_(bindparam("q", expanding=True)))
+ .where(table.c.x.in_(bindparam("q")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [], params={"q": []})
- def test_empty_set_against_integer_negation(self):
+ def test_empty_set_against_integer_direct(self):
table = self.tables.some_table
+ stmt = select(table.c.id).where(table.c.x.in_([])).order_by(table.c.id)
+ self._assert_result(stmt, [])
+ def test_empty_set_against_integer_negation_bindparam(self):
+ table = self.tables.some_table
stmt = (
select(table.c.id)
- .where(table.c.x.not_in(bindparam("q", expanding=True)))
+ .where(table.c.x.not_in(bindparam("q")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
- def test_empty_set_against_string(self):
+ def test_empty_set_against_integer_negation_direct(self):
table = self.tables.some_table
+ stmt = (
+ select(table.c.id).where(table.c.x.not_in([])).order_by(table.c.id)
+ )
+ self._assert_result(stmt, [(1,), (2,), (3,), (4,)])
+ def test_empty_set_against_string_bindparam(self):
+ table = self.tables.some_table
stmt = (
select(table.c.id)
- .where(table.c.z.in_(bindparam("q", expanding=True)))
+ .where(table.c.z.in_(bindparam("q")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [], params={"q": []})
- def test_empty_set_against_string_negation(self):
+ def test_empty_set_against_string_direct(self):
table = self.tables.some_table
+ stmt = select(table.c.id).where(table.c.z.in_([])).order_by(table.c.id)
+ self._assert_result(stmt, [])
+ def test_empty_set_against_string_negation_bindparam(self):
+ table = self.tables.some_table
stmt = (
select(table.c.id)
- .where(table.c.z.not_in(bindparam("q", expanding=True)))
+ .where(table.c.z.not_in(bindparam("q")))
.order_by(table.c.id)
)
-
self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
- def test_null_in_empty_set_is_false(self, connection):
+ def test_empty_set_against_string_negation_direct(self):
+ table = self.tables.some_table
+ stmt = (
+ select(table.c.id).where(table.c.z.not_in([])).order_by(table.c.id)
+ )
+ self._assert_result(stmt, [(1,), (2,), (3,), (4,)])
+
+ def test_null_in_empty_set_is_false_bindparam(self, connection):
+ stmt = select(
+ case(
+ [
+ (
+ null().in_(bindparam("foo", value=())),
+ true(),
+ )
+ ],
+ else_=false(),
+ )
+ )
+ in_(connection.execute(stmt).fetchone()[0], (False, 0))
+
+ def test_null_in_empty_set_is_false_direct(self, connection):
stmt = select(
case(
[
(
- null().in_(bindparam("foo", value=(), expanding=True)),
+ null().in_([]),
true(),
)
],