summaryrefslogtreecommitdiff
path: root/test/orm/test_query.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/test_query.py')
-rw-r--r--test/orm/test_query.py105
1 files changed, 100 insertions, 5 deletions
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index 354bbe5b1..a2a1ee096 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -17,6 +17,9 @@ from sqlalchemy.testing.assertions import (
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assert_warnings
from test.orm import _fixtures
from sqlalchemy.orm.util import join, with_parent
+import contextlib
+from sqlalchemy.testing import mock, is_, is_not_
+from sqlalchemy import inspect
class QueryTest(_fixtures.FixtureTest):
@@ -1484,7 +1487,6 @@ class SliceTest(QueryTest):
assert create_session().query(User).filter(User.id == 27). \
first() is None
- @testing.only_on('sqlite', 'testing execution but db-specific syntax')
def test_limit_offset_applies(self):
"""Test that the expected LIMIT/OFFSET is applied for slices.
@@ -1510,15 +1512,15 @@ class SliceTest(QueryTest):
testing.db, lambda: q[:20], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
- {'param_1': 20, 'param_2': 0})])
+ "AS users_name FROM users LIMIT :param_1",
+ {'param_1': 20})])
self.assert_sql(
testing.db, lambda: q[5:], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
- {'param_1': -1, 'param_2': 5})])
+ "AS users_name FROM users LIMIT -1 OFFSET :param_1",
+ {'param_1': 5})])
self.assert_sql(testing.db, lambda: q[2:2], [])
@@ -3213,3 +3215,96 @@ class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"SELECT x HAVING x = 1",
dialect=self._dialect(False)
)
+
+
+class SessionBindTest(QueryTest):
+
+ @contextlib.contextmanager
+ def _assert_bind_args(self, session):
+ get_bind = mock.Mock(side_effect=session.get_bind)
+ with mock.patch.object(session, "get_bind", get_bind):
+ yield
+ for call_ in get_bind.mock_calls:
+ is_(call_[1][0], inspect(self.classes.User))
+ is_not_(call_[2]['clause'], None)
+
+ def test_single_entity_q(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).all()
+
+ def test_sql_expr_entity_q(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User.id).all()
+
+ def test_count(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).count()
+
+ def test_aggregate_fn(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(func.max(User.name)).all()
+
+ def test_bulk_update_no_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).update(
+ {"name": "foob"}, synchronize_session=False)
+
+ def test_bulk_delete_no_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).delete(
+ synchronize_session=False)
+
+ def test_bulk_update_fetch_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).update(
+ {"name": "foob"}, synchronize_session='fetch')
+
+ def test_bulk_delete_fetch_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).delete(
+ synchronize_session='fetch')
+
+ def test_column_property(self):
+ User = self.classes.User
+
+ mapper = inspect(User)
+ mapper.add_property(
+ "score",
+ column_property(func.coalesce(self.tables.users.c.name, None)))
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(func.max(User.score)).scalar()
+
+ def test_column_property_select(self):
+ User = self.classes.User
+ Address = self.classes.Address
+
+ mapper = inspect(User)
+ mapper.add_property(
+ "score",
+ column_property(
+ select([func.sum(Address.id)]).
+ where(Address.user_id == User.id).as_scalar()
+ )
+ )
+ session = Session()
+
+ with self._assert_bind_args(session):
+ session.query(func.max(User.score)).scalar()
+