diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2023-04-21 16:51:19 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2023-04-21 16:51:19 +0000 |
| commit | c84b3bf198c75ad4f42b0f83d482e480200e6d16 (patch) | |
| tree | 86e991ad8f43515ec7948ff809f44bb7d8b301fa /lib/sqlalchemy/engine | |
| parent | 95628d9707cdfbfdd229b2acee02fbadfbe7ced0 (diff) | |
| parent | cf6872d3bdf1a8a9613e853694acc2b1e6f06f51 (diff) | |
| download | sqlalchemy-c84b3bf198c75ad4f42b0f83d482e480200e6d16.tar.gz | |
Merge "add deterministic imv returning ordering using sentinel columns" into main
Diffstat (limited to 'lib/sqlalchemy/engine')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 48 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 12 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 247 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 91 |
4 files changed, 328 insertions, 70 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index ba2c44ed7..dac7c9473 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -2020,13 +2020,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): if self._echo: stats = context._get_cache_stats() + " (insertmanyvalues)" - for ( - sub_stmt, - sub_params, - setinputsizes, - batchnum, - totalbatches, - ) in dialect._deliver_insertmanyvalues_batches( + + for imv_batch in dialect._deliver_insertmanyvalues_batches( cursor, str_statement, effective_parameters, @@ -2034,20 +2029,25 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): context, ): - if setinputsizes: + if imv_batch.processed_setinputsizes: try: dialect.do_set_input_sizes( - context.cursor, setinputsizes, context + context.cursor, + imv_batch.processed_setinputsizes, + context, ) except BaseException as e: self._handle_dbapi_exception( e, - sql_util._long_statement(sub_stmt), - sub_params, + sql_util._long_statement(imv_batch.replaced_statement), + imv_batch.replaced_parameters, None, context, ) + sub_stmt = imv_batch.replaced_statement + sub_params = imv_batch.replaced_parameters + if engine_events: for fn in self.dispatch.before_cursor_execute: sub_stmt, sub_params = fn( @@ -2063,11 +2063,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): self._log_info(sql_util._long_statement(sub_stmt)) - if batchnum > 1: - stats = ( - f"insertmanyvalues batch {batchnum} " - f"of {totalbatches}" - ) + imv_stats = f""" { + imv_batch.batchnum}/{imv_batch.total_batches} ({ + 'ordered' + if imv_batch.rows_sorted else 'unordered' + }{ + '; batch not supported' + if imv_batch.is_downgraded + else '' + })""" + + if imv_batch.batchnum == 1: + stats += imv_stats + else: + stats = f"insertmanyvalues{imv_stats}" if not self.engine.hide_parameters: self._log_info( @@ -2096,7 +2105,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): ): break else: - dialect.do_execute(cursor, sub_stmt, sub_params, context) + dialect.do_execute( + cursor, + sub_stmt, + sub_params, + context, + ) except BaseException as e: self._handle_dbapi_exception( diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index 1f171ddb0..aaf2c1918 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -1748,13 +1748,18 @@ class CursorResult(Result[_T]): position in the result. The expected use case here is so that multiple INSERT..RETURNING - statements against different tables can produce a single result - that looks like a JOIN of those two tables. + statements (which definitely need to be sorted) against different + tables can produce a single result that looks like a JOIN of those two + tables. E.g.:: r1 = connection.execute( - users.insert().returning(users.c.user_name, users.c.user_id), + users.insert().returning( + users.c.user_name, + users.c.user_id, + sort_by_parameter_order=True + ), user_values ) @@ -1763,6 +1768,7 @@ class CursorResult(Result[_T]): addresses.c.address_id, addresses.c.address, addresses.c.user_id, + sort_by_parameter_order=True ), address_values ) diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 462473de2..8992334ee 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -17,6 +17,7 @@ as the base class for their own corresponding classes. from __future__ import annotations import functools +import operator import random import re from time import perf_counter @@ -60,6 +61,7 @@ from ..sql import type_api from ..sql._typing import is_tuple_type from ..sql.base import _NoArg from ..sql.compiler import DDLCompiler +from ..sql.compiler import InsertmanyvaluesSentinelOpts from ..sql.compiler import SQLCompiler from ..sql.elements import quoted_name from ..util.typing import Final @@ -223,6 +225,10 @@ class DefaultDialect(Dialect): use_insertmanyvalues_wo_returning: bool = False + insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( + InsertmanyvaluesSentinelOpts.NOT_SUPPORTED + ) + insertmanyvalues_page_size: int = 1000 insertmanyvalues_max_parameters = 32700 @@ -369,13 +375,42 @@ class DefaultDialect(Dialect): and self.delete_returning ) - @property + @util.memoized_property def insert_executemany_returning(self): - return ( - self.insert_returning - and self.supports_multivalues_insert - and self.use_insertmanyvalues - ) + """Default implementation for insert_executemany_returning, if not + otherwise overridden by the specific dialect. + + The default dialect determines "insert_executemany_returning" is + available if the dialect in use has opted into using the + "use_insertmanyvalues" feature. If they haven't opted into that, then + this attribute is False, unless the dialect in question overrides this + and provides some other implementation (such as the Oracle dialect). + + """ + return self.insert_returning and self.use_insertmanyvalues + + @util.memoized_property + def insert_executemany_returning_sort_by_parameter_order(self): + """Default implementation for + insert_executemany_returning_deterministic_order, if not otherwise + overridden by the specific dialect. + + The default dialect determines "insert_executemany_returning" can have + deterministic order only if the dialect in use has opted into using the + "use_insertmanyvalues" feature, which implements deterministic ordering + using client side sentinel columns only by default. The + "insertmanyvalues" feature also features alternate forms that can + use server-generated PK values as "sentinels", but those are only + used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` + bitflag enables those alternate SQL forms, which are disabled + by default. + + If the dialect in use hasn't opted into that, then this attribute is + False, unless the dialect in question overrides this and provides some + other implementation (such as the Oracle dialect). + + """ + return self.insert_returning and self.use_insertmanyvalues update_executemany_returning = False delete_executemany_returning = False @@ -725,20 +760,156 @@ class DefaultDialect(Dialect): context = cast(DefaultExecutionContext, context) compiled = cast(SQLCompiler, context.compiled) + imv = compiled._insertmanyvalues + assert imv is not None + is_returning: Final[bool] = bool(compiled.effective_returning) batch_size = context.execution_options.get( "insertmanyvalues_page_size", self.insertmanyvalues_page_size ) + sentinel_value_resolvers = None + if is_returning: - context._insertmanyvalues_rows = result = [] + result: Optional[List[Any]] = [] + context._insertmanyvalues_rows = result + + sort_by_parameter_order = imv.sort_by_parameter_order - for batch_rec in compiled._deliver_insertmanyvalues_batches( - statement, parameters, generic_setinputsizes, batch_size + if imv.num_sentinel_columns: + sentinel_value_resolvers = ( + compiled._imv_sentinel_value_resolvers + ) + else: + sort_by_parameter_order = False + result = None + + for imv_batch in compiled._deliver_insertmanyvalues_batches( + statement, + parameters, + generic_setinputsizes, + batch_size, + sort_by_parameter_order, ): - yield batch_rec + yield imv_batch + if is_returning: - result.extend(cursor.fetchall()) + rows = context.fetchall_for_returning(cursor) + + # I would have thought "is_returning: Final[bool]" + # would have assured this but pylance thinks not + assert result is not None + + if imv.num_sentinel_columns and not imv_batch.is_downgraded: + composite_sentinel = imv.num_sentinel_columns > 1 + if imv.implicit_sentinel: + # for implicit sentinel, which is currently single-col + # integer autoincrement, do a simple sort. + assert not composite_sentinel + result.extend( + sorted(rows, key=operator.itemgetter(-1)) + ) + continue + + # otherwise, create dictionaries to match up batches + # with parameters + assert imv.sentinel_param_keys + + if composite_sentinel: + _nsc = imv.num_sentinel_columns + rows_by_sentinel = { + tuple(row[-_nsc:]): row for row in rows + } + else: + rows_by_sentinel = {row[-1]: row for row in rows} + + if len(rows_by_sentinel) != len(imv_batch.batch): + # see test_insert_exec.py:: + # IMVSentinelTest::test_sentinel_incorrect_rowcount + # for coverage / demonstration + raise exc.InvalidRequestError( + f"Sentinel-keyed result set did not produce " + f"correct number of rows {len(imv_batch.batch)}; " + "produced " + f"{len(rows_by_sentinel)}. Please ensure the " + "sentinel column is fully unique and populated in " + "all cases." + ) + + try: + if composite_sentinel: + if sentinel_value_resolvers: + # composite sentinel (PK) with value resolvers + ordered_rows = [ + rows_by_sentinel[ + tuple( + _resolver(parameters[_spk]) # type: ignore # noqa: E501 + if _resolver + else parameters[_spk] # type: ignore # noqa: E501 + for _resolver, _spk in zip( + sentinel_value_resolvers, + imv.sentinel_param_keys, + ) + ) + ] + for parameters in imv_batch.batch + ] + else: + # composite sentinel (PK) with no value + # resolvers + ordered_rows = [ + rows_by_sentinel[ + tuple( + parameters[_spk] # type: ignore + for _spk in imv.sentinel_param_keys + ) + ] + for parameters in imv_batch.batch + ] + else: + _sentinel_param_key = imv.sentinel_param_keys[0] + if ( + sentinel_value_resolvers + and sentinel_value_resolvers[0] + ): + # single-column sentinel with value resolver + _sentinel_value_resolver = ( + sentinel_value_resolvers[0] + ) + ordered_rows = [ + rows_by_sentinel[ + _sentinel_value_resolver( + parameters[_sentinel_param_key] # type: ignore # noqa: E501 + ) + ] + for parameters in imv_batch.batch + ] + else: + # single-column sentinel with no value resolver + ordered_rows = [ + rows_by_sentinel[ + parameters[_sentinel_param_key] # type: ignore # noqa: E501 + ] + for parameters in imv_batch.batch + ] + except KeyError as ke: + # see test_insert_exec.py:: + # IMVSentinelTest::test_sentinel_cant_match_keys + # for coverage / demonstration + raise exc.InvalidRequestError( + f"Can't match sentinel values in result set to " + f"parameter sets; key {ke.args[0]!r} was not " + "found. " + "There may be a mismatch between the datatype " + "passed to the DBAPI driver vs. that which it " + "returns in a result row. Try using a different " + "datatype, such as integer" + ) from ke + + result.extend(ordered_rows) + + else: + result.extend(rows) def do_executemany(self, cursor, statement, parameters, context=None): cursor.executemany(statement, parameters) @@ -1043,6 +1214,7 @@ class DefaultExecutionContext(ExecutionContext): _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None + _num_sentinel_cols: int = 0 @classmethod def _init_ddl( @@ -1152,6 +1324,17 @@ class DefaultExecutionContext(ExecutionContext): ) elif ( ii + and dml_statement._sort_by_parameter_order + and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 + ): + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "INSERT..RETURNING with deterministic row ordering " + "when executemany is used" + ) + elif ( + ii and self.dialect.use_insertmanyvalues and not compiled._insertmanyvalues ): @@ -1194,6 +1377,10 @@ class DefaultExecutionContext(ExecutionContext): if len(parameters) > 1: if self.isinsert and compiled._insertmanyvalues: self.execute_style = ExecuteStyle.INSERTMANYVALUES + + imv = compiled._insertmanyvalues + if imv.sentinel_columns is not None: + self._num_sentinel_cols = imv.num_sentinel_columns else: self.execute_style = ExecuteStyle.EXECUTEMANY @@ -1525,6 +1712,9 @@ class DefaultExecutionContext(ExecutionContext): self._is_server_side = False return self.create_default_cursor() + def fetchall_for_returning(self, cursor): + return cursor.fetchall() + def create_default_cursor(self): return self._dbapi_connection.cursor() @@ -1689,6 +1879,13 @@ class DefaultExecutionContext(ExecutionContext): ) if cursor_description is None: strategy = _cursor._NO_CURSOR_DML + elif self._num_sentinel_cols: + assert self.execute_style is ExecuteStyle.INSERTMANYVALUES + if cursor_description: + # strip out the sentinel columns from cursor description + cursor_description = cursor_description[ + 0 : -(self._num_sentinel_cols) + ] result: _cursor.CursorResult[Any] = _cursor.CursorResult( self, strategy, cursor_description @@ -2059,21 +2256,14 @@ class DefaultExecutionContext(ExecutionContext): key_getter = compiled._within_exec_param_key_getter - # pre-determine scalar Python-side defaults - # to avoid many calls of get_insert_default()/ - # get_update_default() + sentinel_counter = 0 + if compiled.insert_prefetch: prefetch_recs = [ ( c, key_getter(c), - ( - c.default.arg, # type: ignore - c.default.is_scalar, - c.default.is_callable, - ) - if c.default and c.default.has_arg - else (None, None, None), + c._default_description_tuple, self.get_insert_default, ) for c in compiled.insert_prefetch @@ -2083,13 +2273,7 @@ class DefaultExecutionContext(ExecutionContext): ( c, key_getter(c), - ( - c.onupdate.arg, # type: ignore - c.onupdate.is_scalar, - c.onupdate.is_callable, - ) - if c.onupdate and c.onupdate.has_arg - else (None, None, None), + c._onupdate_description_tuple, self.get_update_default, ) for c in compiled.update_prefetch @@ -2103,10 +2287,13 @@ class DefaultExecutionContext(ExecutionContext): for ( c, param_key, - (arg, is_scalar, is_callable), + (arg, is_scalar, is_callable, is_sentinel), fallback, ) in prefetch_recs: - if is_scalar: + if is_sentinel: + param[param_key] = sentinel_counter + sentinel_counter += 1 + elif is_scalar: param[param_key] = arg elif is_callable: self.current_column = c diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 254aba4bc..0216c155d 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -55,8 +55,10 @@ if TYPE_CHECKING: from ..event import dispatcher from ..exc import StatementError from ..sql import Executable + from ..sql.compiler import _InsertManyValuesBatch from ..sql.compiler import DDLCompiler from ..sql.compiler import IdentifierPreparer + from ..sql.compiler import InsertmanyvaluesSentinelOpts from ..sql.compiler import Linting from ..sql.compiler import SQLCompiler from ..sql.elements import BindParameter @@ -236,14 +238,16 @@ _DBAPIMultiExecuteParams = Union[ _DBAPIAnyExecuteParams = Union[ _DBAPIMultiExecuteParams, _DBAPISingleExecuteParams ] -_DBAPICursorDescription = Tuple[ - str, - "DBAPIType", - Optional[int], - Optional[int], - Optional[int], - Optional[int], - Optional[bool], +_DBAPICursorDescription = Sequence[ + Tuple[ + str, + "DBAPIType", + Optional[int], + Optional[int], + Optional[int], + Optional[int], + Optional[bool], + ] ] _AnySingleExecuteParams = _DBAPISingleExecuteParams @@ -609,9 +613,21 @@ class BindTyping(Enum): aren't. When RENDER_CASTS is used, the compiler will invoke the - :meth:`.SQLCompiler.render_bind_cast` method for each - :class:`.BindParameter` object whose dialect-level type sets the - :attr:`.TypeEngine.render_bind_cast` attribute. + :meth:`.SQLCompiler.render_bind_cast` method for the rendered + string representation of each :class:`.BindParameter` object whose + dialect-level type sets the :attr:`.TypeEngine.render_bind_cast` attribute. + + The :meth:`.SQLCompiler.render_bind_cast` is also used to render casts + for one form of "insertmanyvalues" query, when both + :attr:`.InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT` and + :attr:`.InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS` are set, + where the casts are applied to the intermediary columns e.g. + "INSERT INTO t (a, b, c) SELECT p0::TYP, p1::TYP, p2::TYP " + "FROM (VALUES (?, ?), (?, ?), ...)". + + .. versionadded:: 2.0.10 - :meth:`.SQLCompiler.render_bind_cast` is now + used within some elements of the "insertmanyvalues" implementation. + """ @@ -838,6 +854,14 @@ class Dialect(EventTarget): """ + insert_executemany_returning_sort_by_parameter_order: bool + """dialect / driver / database supports some means of providing + INSERT...RETURNING support when dialect.do_executemany() is used + along with the :paramref:`_dml.Insert.returning.sort_by_parameter_order` + parameter being set. + + """ + update_executemany_returning: bool """dialect supports UPDATE..RETURNING with executemany.""" @@ -881,6 +905,23 @@ class Dialect(EventTarget): .. versionadded:: 2.0 + .. seealso:: + + :ref:`engine_insertmanyvalues` + + """ + + insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts + """Options indicating the database supports a form of bulk INSERT where + the autoincrement integer primary key can be reliably used as an ordering + for INSERTed rows. + + .. versionadded:: 2.0.10 + + .. seealso:: + + :ref:`engine_insertmanyvalues_returning_order` + """ insertmanyvalues_page_size: int @@ -2116,15 +2157,7 @@ class Dialect(EventTarget): parameters: _DBAPIMultiExecuteParams, generic_setinputsizes: Optional[_GenericSetInputSizesType], context: ExecutionContext, - ) -> Iterator[ - Tuple[ - str, - _DBAPISingleExecuteParams, - _GenericSetInputSizesType, - int, - int, - ] - ]: + ) -> Iterator[_InsertManyValuesBatch]: """convert executemany parameters for an INSERT into an iterator of statement/single execute values, used by the insertmanyvalues feature. @@ -3112,6 +3145,24 @@ class ExecutionContext: raise NotImplementedError() + def fetchall_for_returning(self, cursor: DBAPICursor) -> Sequence[Any]: + """For a RETURNING result, deliver cursor.fetchall() from the + DBAPI cursor. + + This is a dialect-specific hook for dialects that have special + considerations when calling upon the rows delivered for a + "RETURNING" statement. Default implementation is + ``cursor.fetchall()``. + + This hook is currently used only by the :term:`insertmanyvalues` + feature. Dialects that don't set ``use_insertmanyvalues=True`` + don't need to consider this hook. + + .. versionadded:: 2.0.10 + + """ + raise NotImplementedError() + class ConnectionEventsTarget(EventTarget): """An object which can accept events from :class:`.ConnectionEvents`. |
