diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-04-05 11:58:52 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-04-21 11:30:40 -0400 |
| commit | cf6872d3bdf1a8a9613e853694acc2b1e6f06f51 (patch) | |
| tree | 3a4ee41ab8b48aea7ac1e275c2f553763ec28dad /lib/sqlalchemy/engine | |
| parent | 63f51491c5f0cb22883c800a065d7c4b4c54774e (diff) | |
| download | sqlalchemy-cf6872d3bdf1a8a9613e853694acc2b1e6f06f51.tar.gz | |
add deterministic imv returning ordering using sentinel columns
Repaired a major shortcoming which was identified in the
:ref:`engine_insertmanyvalues` performance optimization feature first
introduced in the 2.0 series. This was a continuation of the change in
2.0.9 which disabled the SQL Server version of the feature due to a
reliance in the ORM on apparent row ordering that is not guaranteed to take
place. The fix applies new logic to all "insertmanyvalues" operations,
which takes effect when a new parameter
:paramref:`_dml.Insert.returning.sort_by_parameter_order` on the
:meth:`_dml.Insert.returning` or :meth:`_dml.UpdateBase.return_defaults`
methods, that through a combination of alternate SQL forms, direct
correspondence of client side parameters, and in some cases downgrading to
running row-at-a-time, will apply sorting to each batch of returned rows
using correspondence to primary key or other unique values in each row
which can be correlated to the input data.
Performance impact is expected to be minimal as nearly all common primary
key scenarios are suitable for parameter-ordered batching to be
achieved for all backends other than SQLite, while "row-at-a-time"
mode operates with a bare minimum of Python overhead compared to the very
heavyweight approaches used in the 1.x series. For SQLite, there is no
difference in performance when "row-at-a-time" mode is used.
It's anticipated that with an efficient "row-at-a-time" INSERT with
RETURNING batching capability, the "insertmanyvalues" feature can be later
be more easily generalized to third party backends that include RETURNING
support but not necessarily easy ways to guarantee a correspondence
with parameter order.
Fixes: #9618
References: #9603
Change-Id: I1d79353f5f19638f752936ba1c35e4dc235a8b7c
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 48 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 12 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 247 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 91 |
4 files changed, 328 insertions, 70 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index ba2c44ed7..dac7c9473 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -2020,13 +2020,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): if self._echo: stats = context._get_cache_stats() + " (insertmanyvalues)" - for ( - sub_stmt, - sub_params, - setinputsizes, - batchnum, - totalbatches, - ) in dialect._deliver_insertmanyvalues_batches( + + for imv_batch in dialect._deliver_insertmanyvalues_batches( cursor, str_statement, effective_parameters, @@ -2034,20 +2029,25 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): context, ): - if setinputsizes: + if imv_batch.processed_setinputsizes: try: dialect.do_set_input_sizes( - context.cursor, setinputsizes, context + context.cursor, + imv_batch.processed_setinputsizes, + context, ) except BaseException as e: self._handle_dbapi_exception( e, - sql_util._long_statement(sub_stmt), - sub_params, + sql_util._long_statement(imv_batch.replaced_statement), + imv_batch.replaced_parameters, None, context, ) + sub_stmt = imv_batch.replaced_statement + sub_params = imv_batch.replaced_parameters + if engine_events: for fn in self.dispatch.before_cursor_execute: sub_stmt, sub_params = fn( @@ -2063,11 +2063,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): self._log_info(sql_util._long_statement(sub_stmt)) - if batchnum > 1: - stats = ( - f"insertmanyvalues batch {batchnum} " - f"of {totalbatches}" - ) + imv_stats = f""" { + imv_batch.batchnum}/{imv_batch.total_batches} ({ + 'ordered' + if imv_batch.rows_sorted else 'unordered' + }{ + '; batch not supported' + if imv_batch.is_downgraded + else '' + })""" + + if imv_batch.batchnum == 1: + stats += imv_stats + else: + stats = f"insertmanyvalues{imv_stats}" if not self.engine.hide_parameters: self._log_info( @@ -2096,7 +2105,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): ): break else: - dialect.do_execute(cursor, sub_stmt, sub_params, context) + dialect.do_execute( + cursor, + sub_stmt, + sub_params, + context, + ) except BaseException as e: self._handle_dbapi_exception( diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index 1f171ddb0..aaf2c1918 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -1748,13 +1748,18 @@ class CursorResult(Result[_T]): position in the result. The expected use case here is so that multiple INSERT..RETURNING - statements against different tables can produce a single result - that looks like a JOIN of those two tables. + statements (which definitely need to be sorted) against different + tables can produce a single result that looks like a JOIN of those two + tables. E.g.:: r1 = connection.execute( - users.insert().returning(users.c.user_name, users.c.user_id), + users.insert().returning( + users.c.user_name, + users.c.user_id, + sort_by_parameter_order=True + ), user_values ) @@ -1763,6 +1768,7 @@ class CursorResult(Result[_T]): addresses.c.address_id, addresses.c.address, addresses.c.user_id, + sort_by_parameter_order=True ), address_values ) diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 462473de2..8992334ee 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -17,6 +17,7 @@ as the base class for their own corresponding classes. from __future__ import annotations import functools +import operator import random import re from time import perf_counter @@ -60,6 +61,7 @@ from ..sql import type_api from ..sql._typing import is_tuple_type from ..sql.base import _NoArg from ..sql.compiler import DDLCompiler +from ..sql.compiler import InsertmanyvaluesSentinelOpts from ..sql.compiler import SQLCompiler from ..sql.elements import quoted_name from ..util.typing import Final @@ -223,6 +225,10 @@ class DefaultDialect(Dialect): use_insertmanyvalues_wo_returning: bool = False + insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( + InsertmanyvaluesSentinelOpts.NOT_SUPPORTED + ) + insertmanyvalues_page_size: int = 1000 insertmanyvalues_max_parameters = 32700 @@ -369,13 +375,42 @@ class DefaultDialect(Dialect): and self.delete_returning ) - @property + @util.memoized_property def insert_executemany_returning(self): - return ( - self.insert_returning - and self.supports_multivalues_insert - and self.use_insertmanyvalues - ) + """Default implementation for insert_executemany_returning, if not + otherwise overridden by the specific dialect. + + The default dialect determines "insert_executemany_returning" is + available if the dialect in use has opted into using the + "use_insertmanyvalues" feature. If they haven't opted into that, then + this attribute is False, unless the dialect in question overrides this + and provides some other implementation (such as the Oracle dialect). + + """ + return self.insert_returning and self.use_insertmanyvalues + + @util.memoized_property + def insert_executemany_returning_sort_by_parameter_order(self): + """Default implementation for + insert_executemany_returning_deterministic_order, if not otherwise + overridden by the specific dialect. + + The default dialect determines "insert_executemany_returning" can have + deterministic order only if the dialect in use has opted into using the + "use_insertmanyvalues" feature, which implements deterministic ordering + using client side sentinel columns only by default. The + "insertmanyvalues" feature also features alternate forms that can + use server-generated PK values as "sentinels", but those are only + used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` + bitflag enables those alternate SQL forms, which are disabled + by default. + + If the dialect in use hasn't opted into that, then this attribute is + False, unless the dialect in question overrides this and provides some + other implementation (such as the Oracle dialect). + + """ + return self.insert_returning and self.use_insertmanyvalues update_executemany_returning = False delete_executemany_returning = False @@ -725,20 +760,156 @@ class DefaultDialect(Dialect): context = cast(DefaultExecutionContext, context) compiled = cast(SQLCompiler, context.compiled) + imv = compiled._insertmanyvalues + assert imv is not None + is_returning: Final[bool] = bool(compiled.effective_returning) batch_size = context.execution_options.get( "insertmanyvalues_page_size", self.insertmanyvalues_page_size ) + sentinel_value_resolvers = None + if is_returning: - context._insertmanyvalues_rows = result = [] + result: Optional[List[Any]] = [] + context._insertmanyvalues_rows = result + + sort_by_parameter_order = imv.sort_by_parameter_order - for batch_rec in compiled._deliver_insertmanyvalues_batches( - statement, parameters, generic_setinputsizes, batch_size + if imv.num_sentinel_columns: + sentinel_value_resolvers = ( + compiled._imv_sentinel_value_resolvers + ) + else: + sort_by_parameter_order = False + result = None + + for imv_batch in compiled._deliver_insertmanyvalues_batches( + statement, + parameters, + generic_setinputsizes, + batch_size, + sort_by_parameter_order, ): - yield batch_rec + yield imv_batch + if is_returning: - result.extend(cursor.fetchall()) + rows = context.fetchall_for_returning(cursor) + + # I would have thought "is_returning: Final[bool]" + # would have assured this but pylance thinks not + assert result is not None + + if imv.num_sentinel_columns and not imv_batch.is_downgraded: + composite_sentinel = imv.num_sentinel_columns > 1 + if imv.implicit_sentinel: + # for implicit sentinel, which is currently single-col + # integer autoincrement, do a simple sort. + assert not composite_sentinel + result.extend( + sorted(rows, key=operator.itemgetter(-1)) + ) + continue + + # otherwise, create dictionaries to match up batches + # with parameters + assert imv.sentinel_param_keys + + if composite_sentinel: + _nsc = imv.num_sentinel_columns + rows_by_sentinel = { + tuple(row[-_nsc:]): row for row in rows + } + else: + rows_by_sentinel = {row[-1]: row for row in rows} + + if len(rows_by_sentinel) != len(imv_batch.batch): + # see test_insert_exec.py:: + # IMVSentinelTest::test_sentinel_incorrect_rowcount + # for coverage / demonstration + raise exc.InvalidRequestError( + f"Sentinel-keyed result set did not produce " + f"correct number of rows {len(imv_batch.batch)}; " + "produced " + f"{len(rows_by_sentinel)}. Please ensure the " + "sentinel column is fully unique and populated in " + "all cases." + ) + + try: + if composite_sentinel: + if sentinel_value_resolvers: + # composite sentinel (PK) with value resolvers + ordered_rows = [ + rows_by_sentinel[ + tuple( + _resolver(parameters[_spk]) # type: ignore # noqa: E501 + if _resolver + else parameters[_spk] # type: ignore # noqa: E501 + for _resolver, _spk in zip( + sentinel_value_resolvers, + imv.sentinel_param_keys, + ) + ) + ] + for parameters in imv_batch.batch + ] + else: + # composite sentinel (PK) with no value + # resolvers + ordered_rows = [ + rows_by_sentinel[ + tuple( + parameters[_spk] # type: ignore + for _spk in imv.sentinel_param_keys + ) + ] + for parameters in imv_batch.batch + ] + else: + _sentinel_param_key = imv.sentinel_param_keys[0] + if ( + sentinel_value_resolvers + and sentinel_value_resolvers[0] + ): + # single-column sentinel with value resolver + _sentinel_value_resolver = ( + sentinel_value_resolvers[0] + ) + ordered_rows = [ + rows_by_sentinel[ + _sentinel_value_resolver( + parameters[_sentinel_param_key] # type: ignore # noqa: E501 + ) + ] + for parameters in imv_batch.batch + ] + else: + # single-column sentinel with no value resolver + ordered_rows = [ + rows_by_sentinel[ + parameters[_sentinel_param_key] # type: ignore # noqa: E501 + ] + for parameters in imv_batch.batch + ] + except KeyError as ke: + # see test_insert_exec.py:: + # IMVSentinelTest::test_sentinel_cant_match_keys + # for coverage / demonstration + raise exc.InvalidRequestError( + f"Can't match sentinel values in result set to " + f"parameter sets; key {ke.args[0]!r} was not " + "found. " + "There may be a mismatch between the datatype " + "passed to the DBAPI driver vs. that which it " + "returns in a result row. Try using a different " + "datatype, such as integer" + ) from ke + + result.extend(ordered_rows) + + else: + result.extend(rows) def do_executemany(self, cursor, statement, parameters, context=None): cursor.executemany(statement, parameters) @@ -1043,6 +1214,7 @@ class DefaultExecutionContext(ExecutionContext): _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None + _num_sentinel_cols: int = 0 @classmethod def _init_ddl( @@ -1152,6 +1324,17 @@ class DefaultExecutionContext(ExecutionContext): ) elif ( ii + and dml_statement._sort_by_parameter_order + and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 + ): + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "INSERT..RETURNING with deterministic row ordering " + "when executemany is used" + ) + elif ( + ii and self.dialect.use_insertmanyvalues and not compiled._insertmanyvalues ): @@ -1194,6 +1377,10 @@ class DefaultExecutionContext(ExecutionContext): if len(parameters) > 1: if self.isinsert and compiled._insertmanyvalues: self.execute_style = ExecuteStyle.INSERTMANYVALUES + + imv = compiled._insertmanyvalues + if imv.sentinel_columns is not None: + self._num_sentinel_cols = imv.num_sentinel_columns else: self.execute_style = ExecuteStyle.EXECUTEMANY @@ -1525,6 +1712,9 @@ class DefaultExecutionContext(ExecutionContext): self._is_server_side = False return self.create_default_cursor() + def fetchall_for_returning(self, cursor): + return cursor.fetchall() + def create_default_cursor(self): return self._dbapi_connection.cursor() @@ -1689,6 +1879,13 @@ class DefaultExecutionContext(ExecutionContext): ) if cursor_description is None: strategy = _cursor._NO_CURSOR_DML + elif self._num_sentinel_cols: + assert self.execute_style is ExecuteStyle.INSERTMANYVALUES + if cursor_description: + # strip out the sentinel columns from cursor description + cursor_description = cursor_description[ + 0 : -(self._num_sentinel_cols) + ] result: _cursor.CursorResult[Any] = _cursor.CursorResult( self, strategy, cursor_description @@ -2059,21 +2256,14 @@ class DefaultExecutionContext(ExecutionContext): key_getter = compiled._within_exec_param_key_getter - # pre-determine scalar Python-side defaults - # to avoid many calls of get_insert_default()/ - # get_update_default() + sentinel_counter = 0 + if compiled.insert_prefetch: prefetch_recs = [ ( c, key_getter(c), - ( - c.default.arg, # type: ignore - c.default.is_scalar, - c.default.is_callable, - ) - if c.default and c.default.has_arg - else (None, None, None), + c._default_description_tuple, self.get_insert_default, ) for c in compiled.insert_prefetch @@ -2083,13 +2273,7 @@ class DefaultExecutionContext(ExecutionContext): ( c, key_getter(c), - ( - c.onupdate.arg, # type: ignore - c.onupdate.is_scalar, - c.onupdate.is_callable, - ) - if c.onupdate and c.onupdate.has_arg - else (None, None, None), + c._onupdate_description_tuple, self.get_update_default, ) for c in compiled.update_prefetch @@ -2103,10 +2287,13 @@ class DefaultExecutionContext(ExecutionContext): for ( c, param_key, - (arg, is_scalar, is_callable), + (arg, is_scalar, is_callable, is_sentinel), fallback, ) in prefetch_recs: - if is_scalar: + if is_sentinel: + param[param_key] = sentinel_counter + sentinel_counter += 1 + elif is_scalar: param[param_key] = arg elif is_callable: self.current_column = c diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 254aba4bc..0216c155d 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -55,8 +55,10 @@ if TYPE_CHECKING: from ..event import dispatcher from ..exc import StatementError from ..sql import Executable + from ..sql.compiler import _InsertManyValuesBatch from ..sql.compiler import DDLCompiler from ..sql.compiler import IdentifierPreparer + from ..sql.compiler import InsertmanyvaluesSentinelOpts from ..sql.compiler import Linting from ..sql.compiler import SQLCompiler from ..sql.elements import BindParameter @@ -236,14 +238,16 @@ _DBAPIMultiExecuteParams = Union[ _DBAPIAnyExecuteParams = Union[ _DBAPIMultiExecuteParams, _DBAPISingleExecuteParams ] -_DBAPICursorDescription = Tuple[ - str, - "DBAPIType", - Optional[int], - Optional[int], - Optional[int], - Optional[int], - Optional[bool], +_DBAPICursorDescription = Sequence[ + Tuple[ + str, + "DBAPIType", + Optional[int], + Optional[int], + Optional[int], + Optional[int], + Optional[bool], + ] ] _AnySingleExecuteParams = _DBAPISingleExecuteParams @@ -609,9 +613,21 @@ class BindTyping(Enum): aren't. When RENDER_CASTS is used, the compiler will invoke the - :meth:`.SQLCompiler.render_bind_cast` method for each - :class:`.BindParameter` object whose dialect-level type sets the - :attr:`.TypeEngine.render_bind_cast` attribute. + :meth:`.SQLCompiler.render_bind_cast` method for the rendered + string representation of each :class:`.BindParameter` object whose + dialect-level type sets the :attr:`.TypeEngine.render_bind_cast` attribute. + + The :meth:`.SQLCompiler.render_bind_cast` is also used to render casts + for one form of "insertmanyvalues" query, when both + :attr:`.InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT` and + :attr:`.InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS` are set, + where the casts are applied to the intermediary columns e.g. + "INSERT INTO t (a, b, c) SELECT p0::TYP, p1::TYP, p2::TYP " + "FROM (VALUES (?, ?), (?, ?), ...)". + + .. versionadded:: 2.0.10 - :meth:`.SQLCompiler.render_bind_cast` is now + used within some elements of the "insertmanyvalues" implementation. + """ @@ -838,6 +854,14 @@ class Dialect(EventTarget): """ + insert_executemany_returning_sort_by_parameter_order: bool + """dialect / driver / database supports some means of providing + INSERT...RETURNING support when dialect.do_executemany() is used + along with the :paramref:`_dml.Insert.returning.sort_by_parameter_order` + parameter being set. + + """ + update_executemany_returning: bool """dialect supports UPDATE..RETURNING with executemany.""" @@ -881,6 +905,23 @@ class Dialect(EventTarget): .. versionadded:: 2.0 + .. seealso:: + + :ref:`engine_insertmanyvalues` + + """ + + insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts + """Options indicating the database supports a form of bulk INSERT where + the autoincrement integer primary key can be reliably used as an ordering + for INSERTed rows. + + .. versionadded:: 2.0.10 + + .. seealso:: + + :ref:`engine_insertmanyvalues_returning_order` + """ insertmanyvalues_page_size: int @@ -2116,15 +2157,7 @@ class Dialect(EventTarget): parameters: _DBAPIMultiExecuteParams, generic_setinputsizes: Optional[_GenericSetInputSizesType], context: ExecutionContext, - ) -> Iterator[ - Tuple[ - str, - _DBAPISingleExecuteParams, - _GenericSetInputSizesType, - int, - int, - ] - ]: + ) -> Iterator[_InsertManyValuesBatch]: """convert executemany parameters for an INSERT into an iterator of statement/single execute values, used by the insertmanyvalues feature. @@ -3112,6 +3145,24 @@ class ExecutionContext: raise NotImplementedError() + def fetchall_for_returning(self, cursor: DBAPICursor) -> Sequence[Any]: + """For a RETURNING result, deliver cursor.fetchall() from the + DBAPI cursor. + + This is a dialect-specific hook for dialects that have special + considerations when calling upon the rows delivered for a + "RETURNING" statement. Default implementation is + ``cursor.fetchall()``. + + This hook is currently used only by the :term:`insertmanyvalues` + feature. Dialects that don't set ``use_insertmanyvalues=True`` + don't need to consider this hook. + + .. versionadded:: 2.0.10 + + """ + raise NotImplementedError() + class ConnectionEventsTarget(EventTarget): """An object which can accept events from :class:`.ConnectionEvents`. |
