diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-06-30 19:10:06 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-01 12:14:02 -0400 |
| commit | 741af02893a19c879ba4d929151b9358aeb48148 (patch) | |
| tree | 7b408bb82033d90cff4ba9e78ead2c0cda27411f /lib/sqlalchemy/engine | |
| parent | 286e5fb649f77367883800ba4ec3d536e8031ca8 (diff) | |
| download | sqlalchemy-741af02893a19c879ba4d929151b9358aeb48148.tar.gz | |
repair yield_per for non-SS dialects and add new options
Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
execution option for :class:`_engine.Connection` in Core, to mirror that of
the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
the ORM. The option sets both the
:paramref:`_engine.Connection.execution_options.stream_results` option at
the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
most common streaming result configuration which also mirrors that of the
ORM use case in its usage pattern.
Fixed bug in :class:`_engine.Result` where the usage of a buffered result
strategy would not be used if the dialect in use did not support an
explicit "server side cursor" setting, when using
:paramref:`_engine.Connection.execution_options.stream_results`. This is in
error as DBAPIs such as that of SQLite and Oracle already use a
non-buffered result fetching scheme, which still benefits from usage of
partial result fetching. The "buffered" strategy is now used in all
cases where :paramref:`_engine.Connection.execution_options.stream_results`
is set.
Added :meth:`.FilterResult.yield_per` so that result implementations
such as :class:`.MappingResult`, :class:`.ScalarResult` and
:class:`.AsyncResult` have access to this method.
Fixes: #8199
Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/__init__.py | 1 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 85 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 1 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 10 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/result.py | 81 |
5 files changed, 153 insertions, 25 deletions
diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py index 77c2fea40..7bbeb1e73 100644 --- a/lib/sqlalchemy/engine/__init__.py +++ b/lib/sqlalchemy/engine/__init__.py @@ -41,6 +41,7 @@ from .reflection import Inspector as Inspector from .reflection import ObjectKind as ObjectKind from .reflection import ObjectScope as ObjectScope from .result import ChunkedIteratorResult as ChunkedIteratorResult +from .result import FilterResult as FilterResult from .result import FrozenResult as FrozenResult from .result import IteratorResult as IteratorResult from .result import MappingResult as MappingResult diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index fdccf076d..aafa94047 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -358,15 +358,86 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): :class:`_sql.Executable`. Indicate to the dialect that results should be - "streamed" and not pre-buffered, if possible. This is a limitation - of many DBAPIs. The flag is currently understood within a subset - of dialects within the PostgreSQL and MySQL categories, and - may be supported by other third party dialects as well. + "streamed" and not pre-buffered, if possible. For backends + such as PostgreSQL, MySQL and MariaDB, this indicates the use of + a "server side cursor" as opposed to a client side cursor. + Other backends such as that of Oracle may already use server + side cursors by default. + + The usage of + :paramref:`_engine.Connection.execution_options.stream_results` is + usually combined with setting a fixed number of rows to to be fetched + in batches, to allow for efficient iteration of database rows while + at the same time not loading all result rows into memory at once; + this can be configured on a :class:`_engine.Result` object using the + :meth:`_engine.Result.yield_per` method, after execution has + returned a new :class:`_engine.Result`. If + :meth:`_engine.Result.yield_per` is not used, + the :paramref:`_engine.Connection.execution_options.stream_results` + mode of operation will instead use a dynamically sized buffer + which buffers sets of rows at a time, growing on each batch + based on a fixed growth size up until a limit which may + be configured using the + :paramref:`_engine.Connection.execution_options.max_row_buffer` + parameter. + + When using the ORM to fetch ORM mapped objects from a result, + :meth:`_engine.Result.yield_per` should always be used with + :paramref:`_engine.Connection.execution_options.stream_results`, + so that the ORM does not fetch all rows into new ORM objects at once. + + For typical use, the + :paramref:`_engine.Connection.execution_options.yield_per` execution + option should be preferred, which sets up both + :paramref:`_engine.Connection.execution_options.stream_results` and + :meth:`_engine.Result.yield_per` at once. This option is supported + both at a core level by :class:`_engine.Connection` as well as by the + ORM :class:`_engine.Session`; the latter is described at + :ref:`orm_queryguide_yield_per`. .. seealso:: + :ref:`engine_stream_results` - background on + :paramref:`_engine.Connection.execution_options.stream_results` + + :paramref:`_engine.Connection.execution_options.max_row_buffer` + + :paramref:`_engine.Connection.execution_options.yield_per` + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + describing the ORM version of ``yield_per`` + + :param max_row_buffer: Available on: :class:`_engine.Connection`, + :class:`_sql.Executable`. Sets a maximum + buffer size to use when the + :paramref:`_engine.Connection.execution_options.stream_results` + execution option is used on a backend that supports server side + cursors. The default value if not specified is 1000. + + .. seealso:: + + :paramref:`_engine.Connection.execution_options.stream_results` + :ref:`engine_stream_results` + + :param yield_per: Available on: :class:`_engine.Connection`, + :class:`_sql.Executable`. Integer value applied which will + set the :paramref:`_engine.Connection.execution_options.stream_results` + execution option and invoke :meth:`_engine.Result.yield_per` + automatically at once. Allows equivalent functionality as + is present when using this parameter with the ORM. + + .. versionadded:: 1.4.40 + + .. seealso:: + + :ref:`engine_stream_results` - background and examples + on using server side cursors with Core. + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + describing the ORM version of ``yield_per`` + :param schema_translate_map: Available on: :class:`_engine.Connection`, :class:`_engine.Engine`, :class:`_sql.Executable`. @@ -1683,6 +1754,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): """Create an :class:`.ExecutionContext` and execute, returning a :class:`_engine.CursorResult`.""" + if execution_options: + yp = execution_options.get("yield_per", None) + if yp: + execution_options = execution_options.union( + {"stream_results": True, "max_row_buffer": yp} + ) try: conn = self._dbapi_connection if conn is None: diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index 4b0047e34..3ff815f70 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -1034,7 +1034,6 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): growth_factor=5, initial_buffer=None, ): - self._max_row_buffer = execution_options.get("max_row_buffer", 1000) if initial_buffer is not None: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 5d3ff8bb7..cab96eac1 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -1473,11 +1473,16 @@ class DefaultExecutionContext(ExecutionContext): return self.dialect.supports_sane_multi_rowcount def _setup_result_proxy(self): + exec_opt = self.execution_options + if self.is_crud or self.is_text: result = self._setup_dml_or_text_result() + yp = sr = False else: + yp = exec_opt.get("yield_per", None) + sr = self._is_server_side or exec_opt.get("stream_results", False) strategy = self.cursor_fetch_strategy - if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: + if sr and strategy is _cursor._DEFAULT_FETCH: strategy = _cursor.BufferedRowCursorFetchStrategy( self.cursor, self.execution_options ) @@ -1501,6 +1506,9 @@ class DefaultExecutionContext(ExecutionContext): self._soft_closed = result._soft_closed + if yp: + result = result.yield_per(yp) + return result def _setup_out_parameters(self, result): diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 52be2603c..a4e373ec3 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -934,7 +934,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]): @_generative def yield_per(self: SelfResult, num: int) -> SelfResult: - """Configure the row-fetching strategy to fetch num rows at a time. + """Configure the row-fetching strategy to fetch ``num`` rows at a time. This impacts the underlying behavior of the result when iterating over the result object, or otherwise making use of methods such as @@ -949,16 +949,24 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]): conjunction with the :paramref:`_engine.Connection.execution_options.stream_results` execution option, which will allow the database dialect in use to make - use of a server side cursor, if the DBAPI supports it. + use of a server side cursor, if the DBAPI supports a specific "server + side cursor" mode separate from its default mode of operation. - Most DBAPIs do not use server side cursors by default, which means all - rows will be fetched upfront from the database regardless of the - :meth:`_engine.Result.yield_per` setting. However, - :meth:`_engine.Result.yield_per` may still be useful in that it batches - the SQLAlchemy-side processing of the raw data from the database, and - additionally when used for ORM scenarios will batch the conversion of - database rows into ORM entity rows. + .. tip:: + Consider using the + :paramref:`_engine.Connection.execution_options.yield_per` + execution option, which will simultaneously set + :paramref:`_engine.Connection.execution_options.stream_results` + to ensure the use of server side cursors, as well as automatically + invoke the :meth:`_engine.Result.yield_per` method to establish + a fixed row buffer size at once. + + The :paramref:`_engine.Connection.execution_options.yield_per` + execution option is available for ORM operations, with + :class:`_orm.Session`-oriented use described at + :ref:`orm_queryguide_yield_per`. The Core-only version which works + with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40. .. versionadded:: 1.4 @@ -967,9 +975,10 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]): .. seealso:: - :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + :ref:`engine_stream_results` - describes Core behavior for + :meth:`_engine.Result.yield_per` - :meth:`_engine.Result.partitions` + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` """ self._yield_per = num @@ -1219,24 +1228,29 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]): When using the ORM, the :meth:`_engine.Result.partitions` method is typically more effective from a memory perspective when it is - combined with use of the :meth:`_engine.Result.yield_per` method, - which instructs the ORM loading internals to only build a certain - amount of ORM objects from a result at a time before yielding - them out. + combined with use of the + :ref:`yield_per execution option <orm_queryguide_yield_per>`, + which instructs both the DBAPI driver to use server side cursors, + if available, as well as instructs the ORM loading internals to only + build a certain amount of ORM objects from a result at a time before + yielding them out. .. versionadded:: 1.4 :param size: indicate the maximum number of rows to be present in each list yielded. If None, makes use of the value set by - :meth:`_engine.Result.yield_per`, if present, otherwise uses the - :meth:`_engine.Result.fetchmany` default which may be backend - specific. + the :meth:`_engine.Result.yield_per`, method, if it were called, + or the :paramref:`_engine.Connection.execution_options.yield_per` + execution option, which is equivalent in this regard. If + yield_per weren't set, it makes use of the + :meth:`_engine.Result.fetchmany` default, which may be backend + specific and not well defined. :return: iterator of lists .. seealso:: - :paramref:`.Connection.execution_options.stream_results` + :ref:`engine_stream_results` :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` @@ -1517,10 +1531,17 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]): return MergedResult(self._metadata, (self,) + others) +SelfFilterResult = TypeVar("SelfFilterResult", bound="FilterResult[Any]") + + class FilterResult(ResultInternal[_R]): """A wrapper for a :class:`_engine.Result` that returns objects other than :class:`_result.Row` objects, such as dictionaries or scalar objects. + :class:`.FilterResult` is the common base for additional result + APIs including :class:`.MappingResult`, :class:`.ScalarResult` + and :class:`.AsyncResult`. + """ __slots__ = ( @@ -1535,6 +1556,28 @@ class FilterResult(ResultInternal[_R]): _real_result: Result[Any] + @_generative + def yield_per(self: SelfFilterResult, num: int) -> SelfFilterResult: + """Configure the row-fetching strategy to fetch ``num`` rows at a time. + + The :meth:`_engine.FilterResult.yield_per` method is a pass through + to the :meth:`_engine.Result.yield_per` method. See that method's + documentation for usage notes. + + .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per` + so that the method is available on all result set implementations + + .. seealso:: + + :ref:`engine_stream_results` - describes Core behavior for + :meth:`_engine.Result.yield_per` + + :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` + + """ + self._real_result = self._real_result.yield_per(num) + return self + def _soft_close(self, hard: bool = False) -> None: self._real_result._soft_close(hard=hard) |
