diff options
Diffstat (limited to 'lib/sqlalchemy/engine/default.py')
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 247 |
1 files changed, 217 insertions, 30 deletions
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 462473de2..8992334ee 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -17,6 +17,7 @@ as the base class for their own corresponding classes. from __future__ import annotations import functools +import operator import random import re from time import perf_counter @@ -60,6 +61,7 @@ from ..sql import type_api from ..sql._typing import is_tuple_type from ..sql.base import _NoArg from ..sql.compiler import DDLCompiler +from ..sql.compiler import InsertmanyvaluesSentinelOpts from ..sql.compiler import SQLCompiler from ..sql.elements import quoted_name from ..util.typing import Final @@ -223,6 +225,10 @@ class DefaultDialect(Dialect): use_insertmanyvalues_wo_returning: bool = False + insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( + InsertmanyvaluesSentinelOpts.NOT_SUPPORTED + ) + insertmanyvalues_page_size: int = 1000 insertmanyvalues_max_parameters = 32700 @@ -369,13 +375,42 @@ class DefaultDialect(Dialect): and self.delete_returning ) - @property + @util.memoized_property def insert_executemany_returning(self): - return ( - self.insert_returning - and self.supports_multivalues_insert - and self.use_insertmanyvalues - ) + """Default implementation for insert_executemany_returning, if not + otherwise overridden by the specific dialect. + + The default dialect determines "insert_executemany_returning" is + available if the dialect in use has opted into using the + "use_insertmanyvalues" feature. If they haven't opted into that, then + this attribute is False, unless the dialect in question overrides this + and provides some other implementation (such as the Oracle dialect). + + """ + return self.insert_returning and self.use_insertmanyvalues + + @util.memoized_property + def insert_executemany_returning_sort_by_parameter_order(self): + """Default implementation for + insert_executemany_returning_deterministic_order, if not otherwise + overridden by the specific dialect. + + The default dialect determines "insert_executemany_returning" can have + deterministic order only if the dialect in use has opted into using the + "use_insertmanyvalues" feature, which implements deterministic ordering + using client side sentinel columns only by default. The + "insertmanyvalues" feature also features alternate forms that can + use server-generated PK values as "sentinels", but those are only + used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` + bitflag enables those alternate SQL forms, which are disabled + by default. + + If the dialect in use hasn't opted into that, then this attribute is + False, unless the dialect in question overrides this and provides some + other implementation (such as the Oracle dialect). + + """ + return self.insert_returning and self.use_insertmanyvalues update_executemany_returning = False delete_executemany_returning = False @@ -725,20 +760,156 @@ class DefaultDialect(Dialect): context = cast(DefaultExecutionContext, context) compiled = cast(SQLCompiler, context.compiled) + imv = compiled._insertmanyvalues + assert imv is not None + is_returning: Final[bool] = bool(compiled.effective_returning) batch_size = context.execution_options.get( "insertmanyvalues_page_size", self.insertmanyvalues_page_size ) + sentinel_value_resolvers = None + if is_returning: - context._insertmanyvalues_rows = result = [] + result: Optional[List[Any]] = [] + context._insertmanyvalues_rows = result + + sort_by_parameter_order = imv.sort_by_parameter_order - for batch_rec in compiled._deliver_insertmanyvalues_batches( - statement, parameters, generic_setinputsizes, batch_size + if imv.num_sentinel_columns: + sentinel_value_resolvers = ( + compiled._imv_sentinel_value_resolvers + ) + else: + sort_by_parameter_order = False + result = None + + for imv_batch in compiled._deliver_insertmanyvalues_batches( + statement, + parameters, + generic_setinputsizes, + batch_size, + sort_by_parameter_order, ): - yield batch_rec + yield imv_batch + if is_returning: - result.extend(cursor.fetchall()) + rows = context.fetchall_for_returning(cursor) + + # I would have thought "is_returning: Final[bool]" + # would have assured this but pylance thinks not + assert result is not None + + if imv.num_sentinel_columns and not imv_batch.is_downgraded: + composite_sentinel = imv.num_sentinel_columns > 1 + if imv.implicit_sentinel: + # for implicit sentinel, which is currently single-col + # integer autoincrement, do a simple sort. + assert not composite_sentinel + result.extend( + sorted(rows, key=operator.itemgetter(-1)) + ) + continue + + # otherwise, create dictionaries to match up batches + # with parameters + assert imv.sentinel_param_keys + + if composite_sentinel: + _nsc = imv.num_sentinel_columns + rows_by_sentinel = { + tuple(row[-_nsc:]): row for row in rows + } + else: + rows_by_sentinel = {row[-1]: row for row in rows} + + if len(rows_by_sentinel) != len(imv_batch.batch): + # see test_insert_exec.py:: + # IMVSentinelTest::test_sentinel_incorrect_rowcount + # for coverage / demonstration + raise exc.InvalidRequestError( + f"Sentinel-keyed result set did not produce " + f"correct number of rows {len(imv_batch.batch)}; " + "produced " + f"{len(rows_by_sentinel)}. Please ensure the " + "sentinel column is fully unique and populated in " + "all cases." + ) + + try: + if composite_sentinel: + if sentinel_value_resolvers: + # composite sentinel (PK) with value resolvers + ordered_rows = [ + rows_by_sentinel[ + tuple( + _resolver(parameters[_spk]) # type: ignore # noqa: E501 + if _resolver + else parameters[_spk] # type: ignore # noqa: E501 + for _resolver, _spk in zip( + sentinel_value_resolvers, + imv.sentinel_param_keys, + ) + ) + ] + for parameters in imv_batch.batch + ] + else: + # composite sentinel (PK) with no value + # resolvers + ordered_rows = [ + rows_by_sentinel[ + tuple( + parameters[_spk] # type: ignore + for _spk in imv.sentinel_param_keys + ) + ] + for parameters in imv_batch.batch + ] + else: + _sentinel_param_key = imv.sentinel_param_keys[0] + if ( + sentinel_value_resolvers + and sentinel_value_resolvers[0] + ): + # single-column sentinel with value resolver + _sentinel_value_resolver = ( + sentinel_value_resolvers[0] + ) + ordered_rows = [ + rows_by_sentinel[ + _sentinel_value_resolver( + parameters[_sentinel_param_key] # type: ignore # noqa: E501 + ) + ] + for parameters in imv_batch.batch + ] + else: + # single-column sentinel with no value resolver + ordered_rows = [ + rows_by_sentinel[ + parameters[_sentinel_param_key] # type: ignore # noqa: E501 + ] + for parameters in imv_batch.batch + ] + except KeyError as ke: + # see test_insert_exec.py:: + # IMVSentinelTest::test_sentinel_cant_match_keys + # for coverage / demonstration + raise exc.InvalidRequestError( + f"Can't match sentinel values in result set to " + f"parameter sets; key {ke.args[0]!r} was not " + "found. " + "There may be a mismatch between the datatype " + "passed to the DBAPI driver vs. that which it " + "returns in a result row. Try using a different " + "datatype, such as integer" + ) from ke + + result.extend(ordered_rows) + + else: + result.extend(rows) def do_executemany(self, cursor, statement, parameters, context=None): cursor.executemany(statement, parameters) @@ -1043,6 +1214,7 @@ class DefaultExecutionContext(ExecutionContext): _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None + _num_sentinel_cols: int = 0 @classmethod def _init_ddl( @@ -1152,6 +1324,17 @@ class DefaultExecutionContext(ExecutionContext): ) elif ( ii + and dml_statement._sort_by_parameter_order + and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501 + ): + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "INSERT..RETURNING with deterministic row ordering " + "when executemany is used" + ) + elif ( + ii and self.dialect.use_insertmanyvalues and not compiled._insertmanyvalues ): @@ -1194,6 +1377,10 @@ class DefaultExecutionContext(ExecutionContext): if len(parameters) > 1: if self.isinsert and compiled._insertmanyvalues: self.execute_style = ExecuteStyle.INSERTMANYVALUES + + imv = compiled._insertmanyvalues + if imv.sentinel_columns is not None: + self._num_sentinel_cols = imv.num_sentinel_columns else: self.execute_style = ExecuteStyle.EXECUTEMANY @@ -1525,6 +1712,9 @@ class DefaultExecutionContext(ExecutionContext): self._is_server_side = False return self.create_default_cursor() + def fetchall_for_returning(self, cursor): + return cursor.fetchall() + def create_default_cursor(self): return self._dbapi_connection.cursor() @@ -1689,6 +1879,13 @@ class DefaultExecutionContext(ExecutionContext): ) if cursor_description is None: strategy = _cursor._NO_CURSOR_DML + elif self._num_sentinel_cols: + assert self.execute_style is ExecuteStyle.INSERTMANYVALUES + if cursor_description: + # strip out the sentinel columns from cursor description + cursor_description = cursor_description[ + 0 : -(self._num_sentinel_cols) + ] result: _cursor.CursorResult[Any] = _cursor.CursorResult( self, strategy, cursor_description @@ -2059,21 +2256,14 @@ class DefaultExecutionContext(ExecutionContext): key_getter = compiled._within_exec_param_key_getter - # pre-determine scalar Python-side defaults - # to avoid many calls of get_insert_default()/ - # get_update_default() + sentinel_counter = 0 + if compiled.insert_prefetch: prefetch_recs = [ ( c, key_getter(c), - ( - c.default.arg, # type: ignore - c.default.is_scalar, - c.default.is_callable, - ) - if c.default and c.default.has_arg - else (None, None, None), + c._default_description_tuple, self.get_insert_default, ) for c in compiled.insert_prefetch @@ -2083,13 +2273,7 @@ class DefaultExecutionContext(ExecutionContext): ( c, key_getter(c), - ( - c.onupdate.arg, # type: ignore - c.onupdate.is_scalar, - c.onupdate.is_callable, - ) - if c.onupdate and c.onupdate.has_arg - else (None, None, None), + c._onupdate_description_tuple, self.get_update_default, ) for c in compiled.update_prefetch @@ -2103,10 +2287,13 @@ class DefaultExecutionContext(ExecutionContext): for ( c, param_key, - (arg, is_scalar, is_callable), + (arg, is_scalar, is_callable, is_sentinel), fallback, ) in prefetch_recs: - if is_scalar: + if is_sentinel: + param[param_key] = sentinel_counter + sentinel_counter += 1 + elif is_scalar: param[param_key] = arg elif is_callable: self.current_column = c |
