summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-06-30 19:10:06 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-07-01 12:14:02 -0400
commit741af02893a19c879ba4d929151b9358aeb48148 (patch)
tree7b408bb82033d90cff4ba9e78ead2c0cda27411f /lib/sqlalchemy/engine
parent286e5fb649f77367883800ba4ec3d536e8031ca8 (diff)
downloadsqlalchemy-741af02893a19c879ba4d929151b9358aeb48148.tar.gz
repair yield_per for non-SS dialects and add new options
Implemented new :paramref:`_engine.Connection.execution_options.yield_per` execution option for :class:`_engine.Connection` in Core, to mirror that of the same :ref:`yield_per <orm_queryguide_yield_per>` option available in the ORM. The option sets both the :paramref:`_engine.Connection.execution_options.stream_results` option at the same time as invoking :meth:`_engine.Result.yield_per`, to provide the most common streaming result configuration which also mirrors that of the ORM use case in its usage pattern. Fixed bug in :class:`_engine.Result` where the usage of a buffered result strategy would not be used if the dialect in use did not support an explicit "server side cursor" setting, when using :paramref:`_engine.Connection.execution_options.stream_results`. This is in error as DBAPIs such as that of SQLite and Oracle already use a non-buffered result fetching scheme, which still benefits from usage of partial result fetching. The "buffered" strategy is now used in all cases where :paramref:`_engine.Connection.execution_options.stream_results` is set. Added :meth:`.FilterResult.yield_per` so that result implementations such as :class:`.MappingResult`, :class:`.ScalarResult` and :class:`.AsyncResult` have access to this method. Fixes: #8199 Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r--lib/sqlalchemy/engine/__init__.py1
-rw-r--r--lib/sqlalchemy/engine/base.py85
-rw-r--r--lib/sqlalchemy/engine/cursor.py1
-rw-r--r--lib/sqlalchemy/engine/default.py10
-rw-r--r--lib/sqlalchemy/engine/result.py81
5 files changed, 153 insertions, 25 deletions
diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py
index 77c2fea40..7bbeb1e73 100644
--- a/lib/sqlalchemy/engine/__init__.py
+++ b/lib/sqlalchemy/engine/__init__.py
@@ -41,6 +41,7 @@ from .reflection import Inspector as Inspector
from .reflection import ObjectKind as ObjectKind
from .reflection import ObjectScope as ObjectScope
from .result import ChunkedIteratorResult as ChunkedIteratorResult
+from .result import FilterResult as FilterResult
from .result import FrozenResult as FrozenResult
from .result import IteratorResult as IteratorResult
from .result import MappingResult as MappingResult
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index fdccf076d..aafa94047 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -358,15 +358,86 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
:class:`_sql.Executable`.
Indicate to the dialect that results should be
- "streamed" and not pre-buffered, if possible. This is a limitation
- of many DBAPIs. The flag is currently understood within a subset
- of dialects within the PostgreSQL and MySQL categories, and
- may be supported by other third party dialects as well.
+ "streamed" and not pre-buffered, if possible. For backends
+ such as PostgreSQL, MySQL and MariaDB, this indicates the use of
+ a "server side cursor" as opposed to a client side cursor.
+ Other backends such as that of Oracle may already use server
+ side cursors by default.
+
+ The usage of
+ :paramref:`_engine.Connection.execution_options.stream_results` is
+ usually combined with setting a fixed number of rows to to be fetched
+ in batches, to allow for efficient iteration of database rows while
+ at the same time not loading all result rows into memory at once;
+ this can be configured on a :class:`_engine.Result` object using the
+ :meth:`_engine.Result.yield_per` method, after execution has
+ returned a new :class:`_engine.Result`. If
+ :meth:`_engine.Result.yield_per` is not used,
+ the :paramref:`_engine.Connection.execution_options.stream_results`
+ mode of operation will instead use a dynamically sized buffer
+ which buffers sets of rows at a time, growing on each batch
+ based on a fixed growth size up until a limit which may
+ be configured using the
+ :paramref:`_engine.Connection.execution_options.max_row_buffer`
+ parameter.
+
+ When using the ORM to fetch ORM mapped objects from a result,
+ :meth:`_engine.Result.yield_per` should always be used with
+ :paramref:`_engine.Connection.execution_options.stream_results`,
+ so that the ORM does not fetch all rows into new ORM objects at once.
+
+ For typical use, the
+ :paramref:`_engine.Connection.execution_options.yield_per` execution
+ option should be preferred, which sets up both
+ :paramref:`_engine.Connection.execution_options.stream_results` and
+ :meth:`_engine.Result.yield_per` at once. This option is supported
+ both at a core level by :class:`_engine.Connection` as well as by the
+ ORM :class:`_engine.Session`; the latter is described at
+ :ref:`orm_queryguide_yield_per`.
.. seealso::
+ :ref:`engine_stream_results` - background on
+ :paramref:`_engine.Connection.execution_options.stream_results`
+
+ :paramref:`_engine.Connection.execution_options.max_row_buffer`
+
+ :paramref:`_engine.Connection.execution_options.yield_per`
+
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+ describing the ORM version of ``yield_per``
+
+ :param max_row_buffer: Available on: :class:`_engine.Connection`,
+ :class:`_sql.Executable`. Sets a maximum
+ buffer size to use when the
+ :paramref:`_engine.Connection.execution_options.stream_results`
+ execution option is used on a backend that supports server side
+ cursors. The default value if not specified is 1000.
+
+ .. seealso::
+
+ :paramref:`_engine.Connection.execution_options.stream_results`
+
:ref:`engine_stream_results`
+
+ :param yield_per: Available on: :class:`_engine.Connection`,
+ :class:`_sql.Executable`. Integer value applied which will
+ set the :paramref:`_engine.Connection.execution_options.stream_results`
+ execution option and invoke :meth:`_engine.Result.yield_per`
+ automatically at once. Allows equivalent functionality as
+ is present when using this parameter with the ORM.
+
+ .. versionadded:: 1.4.40
+
+ .. seealso::
+
+ :ref:`engine_stream_results` - background and examples
+ on using server side cursors with Core.
+
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+ describing the ORM version of ``yield_per``
+
:param schema_translate_map: Available on: :class:`_engine.Connection`,
:class:`_engine.Engine`, :class:`_sql.Executable`.
@@ -1683,6 +1754,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
+ if execution_options:
+ yp = execution_options.get("yield_per", None)
+ if yp:
+ execution_options = execution_options.union(
+ {"stream_results": True, "max_row_buffer": yp}
+ )
try:
conn = self._dbapi_connection
if conn is None:
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index 4b0047e34..3ff815f70 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -1034,7 +1034,6 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
growth_factor=5,
initial_buffer=None,
):
-
self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
if initial_buffer is not None:
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 5d3ff8bb7..cab96eac1 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -1473,11 +1473,16 @@ class DefaultExecutionContext(ExecutionContext):
return self.dialect.supports_sane_multi_rowcount
def _setup_result_proxy(self):
+ exec_opt = self.execution_options
+
if self.is_crud or self.is_text:
result = self._setup_dml_or_text_result()
+ yp = sr = False
else:
+ yp = exec_opt.get("yield_per", None)
+ sr = self._is_server_side or exec_opt.get("stream_results", False)
strategy = self.cursor_fetch_strategy
- if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
+ if sr and strategy is _cursor._DEFAULT_FETCH:
strategy = _cursor.BufferedRowCursorFetchStrategy(
self.cursor, self.execution_options
)
@@ -1501,6 +1506,9 @@ class DefaultExecutionContext(ExecutionContext):
self._soft_closed = result._soft_closed
+ if yp:
+ result = result.yield_per(yp)
+
return result
def _setup_out_parameters(self, result):
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index 52be2603c..a4e373ec3 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -934,7 +934,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
@_generative
def yield_per(self: SelfResult, num: int) -> SelfResult:
- """Configure the row-fetching strategy to fetch num rows at a time.
+ """Configure the row-fetching strategy to fetch ``num`` rows at a time.
This impacts the underlying behavior of the result when iterating over
the result object, or otherwise making use of methods such as
@@ -949,16 +949,24 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
conjunction with the
:paramref:`_engine.Connection.execution_options.stream_results`
execution option, which will allow the database dialect in use to make
- use of a server side cursor, if the DBAPI supports it.
+ use of a server side cursor, if the DBAPI supports a specific "server
+ side cursor" mode separate from its default mode of operation.
- Most DBAPIs do not use server side cursors by default, which means all
- rows will be fetched upfront from the database regardless of the
- :meth:`_engine.Result.yield_per` setting. However,
- :meth:`_engine.Result.yield_per` may still be useful in that it batches
- the SQLAlchemy-side processing of the raw data from the database, and
- additionally when used for ORM scenarios will batch the conversion of
- database rows into ORM entity rows.
+ .. tip::
+ Consider using the
+ :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option, which will simultaneously set
+ :paramref:`_engine.Connection.execution_options.stream_results`
+ to ensure the use of server side cursors, as well as automatically
+ invoke the :meth:`_engine.Result.yield_per` method to establish
+ a fixed row buffer size at once.
+
+ The :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option is available for ORM operations, with
+ :class:`_orm.Session`-oriented use described at
+ :ref:`orm_queryguide_yield_per`. The Core-only version which works
+ with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
.. versionadded:: 1.4
@@ -967,9 +975,10 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
.. seealso::
- :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+ :ref:`engine_stream_results` - describes Core behavior for
+ :meth:`_engine.Result.yield_per`
- :meth:`_engine.Result.partitions`
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
"""
self._yield_per = num
@@ -1219,24 +1228,29 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
When using the ORM, the :meth:`_engine.Result.partitions` method
is typically more effective from a memory perspective when it is
- combined with use of the :meth:`_engine.Result.yield_per` method,
- which instructs the ORM loading internals to only build a certain
- amount of ORM objects from a result at a time before yielding
- them out.
+ combined with use of the
+ :ref:`yield_per execution option <orm_queryguide_yield_per>`,
+ which instructs both the DBAPI driver to use server side cursors,
+ if available, as well as instructs the ORM loading internals to only
+ build a certain amount of ORM objects from a result at a time before
+ yielding them out.
.. versionadded:: 1.4
:param size: indicate the maximum number of rows to be present
in each list yielded. If None, makes use of the value set by
- :meth:`_engine.Result.yield_per`, if present, otherwise uses the
- :meth:`_engine.Result.fetchmany` default which may be backend
- specific.
+ the :meth:`_engine.Result.yield_per`, method, if it were called,
+ or the :paramref:`_engine.Connection.execution_options.yield_per`
+ execution option, which is equivalent in this regard. If
+ yield_per weren't set, it makes use of the
+ :meth:`_engine.Result.fetchmany` default, which may be backend
+ specific and not well defined.
:return: iterator of lists
.. seealso::
- :paramref:`.Connection.execution_options.stream_results`
+ :ref:`engine_stream_results`
:ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
@@ -1517,10 +1531,17 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
return MergedResult(self._metadata, (self,) + others)
+SelfFilterResult = TypeVar("SelfFilterResult", bound="FilterResult[Any]")
+
+
class FilterResult(ResultInternal[_R]):
"""A wrapper for a :class:`_engine.Result` that returns objects other than
:class:`_result.Row` objects, such as dictionaries or scalar objects.
+ :class:`.FilterResult` is the common base for additional result
+ APIs including :class:`.MappingResult`, :class:`.ScalarResult`
+ and :class:`.AsyncResult`.
+
"""
__slots__ = (
@@ -1535,6 +1556,28 @@ class FilterResult(ResultInternal[_R]):
_real_result: Result[Any]
+ @_generative
+ def yield_per(self: SelfFilterResult, num: int) -> SelfFilterResult:
+ """Configure the row-fetching strategy to fetch ``num`` rows at a time.
+
+ The :meth:`_engine.FilterResult.yield_per` method is a pass through
+ to the :meth:`_engine.Result.yield_per` method. See that method's
+ documentation for usage notes.
+
+ .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
+ so that the method is available on all result set implementations
+
+ .. seealso::
+
+ :ref:`engine_stream_results` - describes Core behavior for
+ :meth:`_engine.Result.yield_per`
+
+ :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
+
+ """
+ self._real_result = self._real_result.yield_per(num)
+ return self
+
def _soft_close(self, hard: bool = False) -> None:
self._real_result._soft_close(hard=hard)