summaryrefslogtreecommitdiff
path: root/test/orm/test_query.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-09-25 22:31:16 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-10-31 13:44:53 -0400
commit654b462d668a2ced4e87077b9babb2590acbf983 (patch)
tree8b6023480423e990c9bbca7c280cb1cb58e012fc /test/orm/test_query.py
parent841eb216644202567ebddfc0badc51a3a35e98c3 (diff)
downloadsqlalchemy-review/mike_bayer/tutorial20.tar.gz
Add SelectBase.exists() method as it seems strange this is not available already. The Exists construct itself does not provide full SELECT-building capabilities so it makes sense this should be used more like a scalar_subquery. Make sure stream_results is getting set up when yield_per is used, for 2.0 style statements as well. this was hardcoded inside of Query.yield_per() and is now moved to take place within QueryContext. Change-Id: Icafcd4fd9b708772343d56edf40995c9e8f835d6
Diffstat (limited to 'test/orm/test_query.py')
-rw-r--r--test/orm/test_query.py48
1 files changed, 42 insertions, 6 deletions
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index 63f50d4f9..8cca45b27 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -4453,19 +4453,55 @@ class YieldTest(_fixtures.FixtureTest):
except StopIteration:
pass
- def test_yield_per_and_execution_options(self):
+ def test_yield_per_and_execution_options_legacy(self):
self._eagerload_mappings()
User = self.classes.User
sess = create_session()
+
+ @event.listens_for(sess, "do_orm_execute")
+ def check(ctx):
+ eq_(ctx.load_options._yield_per, 15)
+ eq_(
+ {
+ k: v
+ for k, v in ctx.execution_options.items()
+ if not k.startswith("_")
+ },
+ {"max_row_buffer": 15, "stream_results": True, "foo": "bar"},
+ )
+
q = sess.query(User).yield_per(15)
q = q.execution_options(foo="bar")
- assert q.load_options._yield_per
- eq_(
- q._execution_options,
- {"stream_results": True, "foo": "bar", "max_row_buffer": 15},
- )
+
+ q.all()
+
+ def test_yield_per_and_execution_options(self):
+ self._eagerload_mappings()
+
+ User = self.classes.User
+
+ sess = create_session()
+
+ @event.listens_for(sess, "do_orm_execute")
+ def check(ctx):
+ eq_(ctx.load_options._yield_per, 15)
+ eq_(
+ {
+ k: v
+ for k, v in ctx.execution_options.items()
+ if not k.startswith("_")
+ },
+ {
+ "max_row_buffer": 15,
+ "stream_results": True,
+ "yield_per": 15,
+ },
+ )
+
+ stmt = select(User).execution_options(yield_per=15)
+ sess.execute(stmt)
def test_no_joinedload_opt(self):
self._eagerload_mappings()