diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-04-28 18:31:51 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-04-29 14:43:09 -0400 |
| commit | aba308868544b21bafa0b3435701ddc908654b0a (patch) | |
| tree | 9160bdeacf66b4227e73203f7bb81a074d463927 /lib/sqlalchemy/testing | |
| parent | 5b12393e81f6b8953e9ebd46801e6943007b7a56 (diff) | |
| download | sqlalchemy-aba308868544b21bafa0b3435701ddc908654b0a.tar.gz | |
Use non-subquery form for empty IN
Revised the "EMPTY IN" expression to no longer rely upon using a subquery,
as this was causing some compatibility and performance problems. The new
approach for selected databases takes advantage of using a NULL-returning
IN expression combined with the usual "1 != 1" or "1 = 1" expression
appended by AND or OR. The expression is now the default for all backends
other than SQLite, which still had some compatibility issues regarding
tuple "IN" for older SQLite versions.
Third party dialects can still override how the "empty set" expression
renders by implementing a new compiler method
``def visit_empty_set_op_expr(self, type_, expand_op)``, which takes
precedence over the existing
``def visit_empty_set_expr(self, element_types)`` which remains in place.
Fixes: #6258
Fixes: #6397
Change-Id: I2df09eb00d2ad3b57039ae48128fdf94641b5e59
Diffstat (limited to 'lib/sqlalchemy/testing')
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_select.py | 187 |
1 files changed, 135 insertions, 52 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_select.py b/lib/sqlalchemy/testing/suite/test_select.py index 7b35dc3fa..1614acd3d 100644 --- a/lib/sqlalchemy/testing/suite/test_select.py +++ b/lib/sqlalchemy/testing/suite/test_select.py @@ -1016,163 +1016,246 @@ class ExpandingBoundInTest(fixtures.TablesTest): with config.db.connect() as conn: eq_(conn.execute(select, params).fetchall(), result) - def test_multiple_empty_sets(self): + def test_multiple_empty_sets_bindparam(self): # test that any anonymous aliasing used by the dialect # is fine with duplicates table = self.tables.some_table - stmt = ( select(table.c.id) - .where(table.c.x.in_(bindparam("q", expanding=True))) - .where(table.c.y.in_(bindparam("p", expanding=True))) + .where(table.c.x.in_(bindparam("q"))) + .where(table.c.y.in_(bindparam("p"))) .order_by(table.c.id) ) - self._assert_result(stmt, [], params={"q": [], "p": []}) - @testing.requires.tuple_in_w_empty - def test_empty_heterogeneous_tuples(self): + def test_multiple_empty_sets_direct(self): + # test that any anonymous aliasing used by the dialect + # is fine with duplicates table = self.tables.some_table - stmt = ( select(table.c.id) - .where( - tuple_(table.c.x, table.c.z).in_( - bindparam("q", expanding=True) - ) - ) + .where(table.c.x.in_([])) + .where(table.c.y.in_([])) .order_by(table.c.id) ) + self._assert_result(stmt, []) + @testing.requires.tuple_in_w_empty + def test_empty_heterogeneous_tuples_bindparam(self): + table = self.tables.some_table + stmt = ( + select(table.c.id) + .where(tuple_(table.c.x, table.c.z).in_(bindparam("q"))) + .order_by(table.c.id) + ) self._assert_result(stmt, [], params={"q": []}) @testing.requires.tuple_in_w_empty - def test_empty_homogeneous_tuples(self): + def test_empty_heterogeneous_tuples_direct(self): table = self.tables.some_table + def go(val, expected): + stmt = ( + select(table.c.id) + .where(tuple_(table.c.x, table.c.z).in_(val)) + .order_by(table.c.id) + ) + self._assert_result(stmt, expected) + + go([], []) + go([(2, "z2"), (3, "z3"), (4, "z4")], [(2,), (3,), (4,)]) + go([], []) + + @testing.requires.tuple_in_w_empty + def test_empty_homogeneous_tuples_bindparam(self): + table = self.tables.some_table stmt = ( select(table.c.id) - .where( - tuple_(table.c.x, table.c.y).in_( - bindparam("q", expanding=True) - ) - ) + .where(tuple_(table.c.x, table.c.y).in_(bindparam("q"))) .order_by(table.c.id) ) - self._assert_result(stmt, [], params={"q": []}) - def test_bound_in_scalar(self): + @testing.requires.tuple_in_w_empty + def test_empty_homogeneous_tuples_direct(self): table = self.tables.some_table + def go(val, expected): + stmt = ( + select(table.c.id) + .where(tuple_(table.c.x, table.c.y).in_(val)) + .order_by(table.c.id) + ) + self._assert_result(stmt, expected) + + go([], []) + go([(1, 2), (2, 3), (3, 4)], [(1,), (2,), (3,)]) + go([], []) + + def test_bound_in_scalar_bindparam(self): + table = self.tables.some_table stmt = ( select(table.c.id) - .where(table.c.x.in_(bindparam("q", expanding=True))) + .where(table.c.x.in_(bindparam("q"))) .order_by(table.c.id) ) - self._assert_result(stmt, [(2,), (3,), (4,)], params={"q": [2, 3, 4]}) - @testing.requires.tuple_in - def test_bound_in_two_tuple(self): + def test_bound_in_scalar_direct(self): table = self.tables.some_table - stmt = ( select(table.c.id) - .where( - tuple_(table.c.x, table.c.y).in_( - bindparam("q", expanding=True) - ) - ) + .where(table.c.x.in_([2, 3, 4])) .order_by(table.c.id) ) + self._assert_result(stmt, [(2,), (3,), (4,)]) + @testing.requires.tuple_in + def test_bound_in_two_tuple_bindparam(self): + table = self.tables.some_table + stmt = ( + select(table.c.id) + .where(tuple_(table.c.x, table.c.y).in_(bindparam("q"))) + .order_by(table.c.id) + ) self._assert_result( stmt, [(2,), (3,), (4,)], params={"q": [(2, 3), (3, 4), (4, 5)]} ) @testing.requires.tuple_in - def test_bound_in_heterogeneous_two_tuple(self): + def test_bound_in_two_tuple_direct(self): + table = self.tables.some_table + stmt = ( + select(table.c.id) + .where(tuple_(table.c.x, table.c.y).in_([(2, 3), (3, 4), (4, 5)])) + .order_by(table.c.id) + ) + self._assert_result(stmt, [(2,), (3,), (4,)]) + + @testing.requires.tuple_in + def test_bound_in_heterogeneous_two_tuple_bindparam(self): table = self.tables.some_table + stmt = ( + select(table.c.id) + .where(tuple_(table.c.x, table.c.z).in_(bindparam("q"))) + .order_by(table.c.id) + ) + self._assert_result( + stmt, + [(2,), (3,), (4,)], + params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]}, + ) + @testing.requires.tuple_in + def test_bound_in_heterogeneous_two_tuple_direct(self): + table = self.tables.some_table stmt = ( select(table.c.id) .where( tuple_(table.c.x, table.c.z).in_( - bindparam("q", expanding=True) + [(2, "z2"), (3, "z3"), (4, "z4")] ) ) .order_by(table.c.id) ) - self._assert_result( stmt, [(2,), (3,), (4,)], - params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]}, ) @testing.requires.tuple_in - def test_bound_in_heterogeneous_two_tuple_text(self): + def test_bound_in_heterogeneous_two_tuple_text_bindparam(self): + # note this becomes ARRAY if we dont use expanding + # explicitly right now stmt = text( "select id FROM some_table WHERE (x, z) IN :q ORDER BY id" ).bindparams(bindparam("q", expanding=True)) - self._assert_result( stmt, [(2,), (3,), (4,)], params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]}, ) - def test_empty_set_against_integer(self): + def test_empty_set_against_integer_bindparam(self): table = self.tables.some_table - stmt = ( select(table.c.id) - .where(table.c.x.in_(bindparam("q", expanding=True))) + .where(table.c.x.in_(bindparam("q"))) .order_by(table.c.id) ) - self._assert_result(stmt, [], params={"q": []}) - def test_empty_set_against_integer_negation(self): + def test_empty_set_against_integer_direct(self): table = self.tables.some_table + stmt = select(table.c.id).where(table.c.x.in_([])).order_by(table.c.id) + self._assert_result(stmt, []) + def test_empty_set_against_integer_negation_bindparam(self): + table = self.tables.some_table stmt = ( select(table.c.id) - .where(table.c.x.not_in(bindparam("q", expanding=True))) + .where(table.c.x.not_in(bindparam("q"))) .order_by(table.c.id) ) - self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []}) - def test_empty_set_against_string(self): + def test_empty_set_against_integer_negation_direct(self): table = self.tables.some_table + stmt = ( + select(table.c.id).where(table.c.x.not_in([])).order_by(table.c.id) + ) + self._assert_result(stmt, [(1,), (2,), (3,), (4,)]) + def test_empty_set_against_string_bindparam(self): + table = self.tables.some_table stmt = ( select(table.c.id) - .where(table.c.z.in_(bindparam("q", expanding=True))) + .where(table.c.z.in_(bindparam("q"))) .order_by(table.c.id) ) - self._assert_result(stmt, [], params={"q": []}) - def test_empty_set_against_string_negation(self): + def test_empty_set_against_string_direct(self): table = self.tables.some_table + stmt = select(table.c.id).where(table.c.z.in_([])).order_by(table.c.id) + self._assert_result(stmt, []) + def test_empty_set_against_string_negation_bindparam(self): + table = self.tables.some_table stmt = ( select(table.c.id) - .where(table.c.z.not_in(bindparam("q", expanding=True))) + .where(table.c.z.not_in(bindparam("q"))) .order_by(table.c.id) ) - self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []}) - def test_null_in_empty_set_is_false(self, connection): + def test_empty_set_against_string_negation_direct(self): + table = self.tables.some_table + stmt = ( + select(table.c.id).where(table.c.z.not_in([])).order_by(table.c.id) + ) + self._assert_result(stmt, [(1,), (2,), (3,), (4,)]) + + def test_null_in_empty_set_is_false_bindparam(self, connection): + stmt = select( + case( + [ + ( + null().in_(bindparam("foo", value=())), + true(), + ) + ], + else_=false(), + ) + ) + in_(connection.execute(stmt).fetchone()[0], (False, 0)) + + def test_null_in_empty_set_is_false_direct(self, connection): stmt = select( case( [ ( - null().in_(bindparam("foo", value=(), expanding=True)), + null().in_([]), true(), ) ], |
