summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2023-04-21 16:51:19 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2023-04-21 16:51:19 +0000
commitc84b3bf198c75ad4f42b0f83d482e480200e6d16 (patch)
tree86e991ad8f43515ec7948ff809f44bb7d8b301fa /lib/sqlalchemy/engine
parent95628d9707cdfbfdd229b2acee02fbadfbe7ced0 (diff)
parentcf6872d3bdf1a8a9613e853694acc2b1e6f06f51 (diff)
downloadsqlalchemy-c84b3bf198c75ad4f42b0f83d482e480200e6d16.tar.gz
Merge "add deterministic imv returning ordering using sentinel columns" into main
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r--lib/sqlalchemy/engine/base.py48
-rw-r--r--lib/sqlalchemy/engine/cursor.py12
-rw-r--r--lib/sqlalchemy/engine/default.py247
-rw-r--r--lib/sqlalchemy/engine/interfaces.py91
4 files changed, 328 insertions, 70 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index ba2c44ed7..dac7c9473 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2020,13 +2020,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self._echo:
stats = context._get_cache_stats() + " (insertmanyvalues)"
- for (
- sub_stmt,
- sub_params,
- setinputsizes,
- batchnum,
- totalbatches,
- ) in dialect._deliver_insertmanyvalues_batches(
+
+ for imv_batch in dialect._deliver_insertmanyvalues_batches(
cursor,
str_statement,
effective_parameters,
@@ -2034,20 +2029,25 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
context,
):
- if setinputsizes:
+ if imv_batch.processed_setinputsizes:
try:
dialect.do_set_input_sizes(
- context.cursor, setinputsizes, context
+ context.cursor,
+ imv_batch.processed_setinputsizes,
+ context,
)
except BaseException as e:
self._handle_dbapi_exception(
e,
- sql_util._long_statement(sub_stmt),
- sub_params,
+ sql_util._long_statement(imv_batch.replaced_statement),
+ imv_batch.replaced_parameters,
None,
context,
)
+ sub_stmt = imv_batch.replaced_statement
+ sub_params = imv_batch.replaced_parameters
+
if engine_events:
for fn in self.dispatch.before_cursor_execute:
sub_stmt, sub_params = fn(
@@ -2063,11 +2063,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self._log_info(sql_util._long_statement(sub_stmt))
- if batchnum > 1:
- stats = (
- f"insertmanyvalues batch {batchnum} "
- f"of {totalbatches}"
- )
+ imv_stats = f""" {
+ imv_batch.batchnum}/{imv_batch.total_batches} ({
+ 'ordered'
+ if imv_batch.rows_sorted else 'unordered'
+ }{
+ '; batch not supported'
+ if imv_batch.is_downgraded
+ else ''
+ })"""
+
+ if imv_batch.batchnum == 1:
+ stats += imv_stats
+ else:
+ stats = f"insertmanyvalues{imv_stats}"
if not self.engine.hide_parameters:
self._log_info(
@@ -2096,7 +2105,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
):
break
else:
- dialect.do_execute(cursor, sub_stmt, sub_params, context)
+ dialect.do_execute(
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ )
except BaseException as e:
self._handle_dbapi_exception(
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index 1f171ddb0..aaf2c1918 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -1748,13 +1748,18 @@ class CursorResult(Result[_T]):
position in the result.
The expected use case here is so that multiple INSERT..RETURNING
- statements against different tables can produce a single result
- that looks like a JOIN of those two tables.
+ statements (which definitely need to be sorted) against different
+ tables can produce a single result that looks like a JOIN of those two
+ tables.
E.g.::
r1 = connection.execute(
- users.insert().returning(users.c.user_name, users.c.user_id),
+ users.insert().returning(
+ users.c.user_name,
+ users.c.user_id,
+ sort_by_parameter_order=True
+ ),
user_values
)
@@ -1763,6 +1768,7 @@ class CursorResult(Result[_T]):
addresses.c.address_id,
addresses.c.address,
addresses.c.user_id,
+ sort_by_parameter_order=True
),
address_values
)
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 462473de2..8992334ee 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -17,6 +17,7 @@ as the base class for their own corresponding classes.
from __future__ import annotations
import functools
+import operator
import random
import re
from time import perf_counter
@@ -60,6 +61,7 @@ from ..sql import type_api
from ..sql._typing import is_tuple_type
from ..sql.base import _NoArg
from ..sql.compiler import DDLCompiler
+from ..sql.compiler import InsertmanyvaluesSentinelOpts
from ..sql.compiler import SQLCompiler
from ..sql.elements import quoted_name
from ..util.typing import Final
@@ -223,6 +225,10 @@ class DefaultDialect(Dialect):
use_insertmanyvalues_wo_returning: bool = False
+ insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
+ InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
+ )
+
insertmanyvalues_page_size: int = 1000
insertmanyvalues_max_parameters = 32700
@@ -369,13 +375,42 @@ class DefaultDialect(Dialect):
and self.delete_returning
)
- @property
+ @util.memoized_property
def insert_executemany_returning(self):
- return (
- self.insert_returning
- and self.supports_multivalues_insert
- and self.use_insertmanyvalues
- )
+ """Default implementation for insert_executemany_returning, if not
+ otherwise overridden by the specific dialect.
+
+ The default dialect determines "insert_executemany_returning" is
+ available if the dialect in use has opted into using the
+ "use_insertmanyvalues" feature. If they haven't opted into that, then
+ this attribute is False, unless the dialect in question overrides this
+ and provides some other implementation (such as the Oracle dialect).
+
+ """
+ return self.insert_returning and self.use_insertmanyvalues
+
+ @util.memoized_property
+ def insert_executemany_returning_sort_by_parameter_order(self):
+ """Default implementation for
+ insert_executemany_returning_deterministic_order, if not otherwise
+ overridden by the specific dialect.
+
+ The default dialect determines "insert_executemany_returning" can have
+ deterministic order only if the dialect in use has opted into using the
+ "use_insertmanyvalues" feature, which implements deterministic ordering
+ using client side sentinel columns only by default. The
+ "insertmanyvalues" feature also features alternate forms that can
+ use server-generated PK values as "sentinels", but those are only
+ used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
+ bitflag enables those alternate SQL forms, which are disabled
+ by default.
+
+ If the dialect in use hasn't opted into that, then this attribute is
+ False, unless the dialect in question overrides this and provides some
+ other implementation (such as the Oracle dialect).
+
+ """
+ return self.insert_returning and self.use_insertmanyvalues
update_executemany_returning = False
delete_executemany_returning = False
@@ -725,20 +760,156 @@ class DefaultDialect(Dialect):
context = cast(DefaultExecutionContext, context)
compiled = cast(SQLCompiler, context.compiled)
+ imv = compiled._insertmanyvalues
+ assert imv is not None
+
is_returning: Final[bool] = bool(compiled.effective_returning)
batch_size = context.execution_options.get(
"insertmanyvalues_page_size", self.insertmanyvalues_page_size
)
+ sentinel_value_resolvers = None
+
if is_returning:
- context._insertmanyvalues_rows = result = []
+ result: Optional[List[Any]] = []
+ context._insertmanyvalues_rows = result
+
+ sort_by_parameter_order = imv.sort_by_parameter_order
- for batch_rec in compiled._deliver_insertmanyvalues_batches(
- statement, parameters, generic_setinputsizes, batch_size
+ if imv.num_sentinel_columns:
+ sentinel_value_resolvers = (
+ compiled._imv_sentinel_value_resolvers
+ )
+ else:
+ sort_by_parameter_order = False
+ result = None
+
+ for imv_batch in compiled._deliver_insertmanyvalues_batches(
+ statement,
+ parameters,
+ generic_setinputsizes,
+ batch_size,
+ sort_by_parameter_order,
):
- yield batch_rec
+ yield imv_batch
+
if is_returning:
- result.extend(cursor.fetchall())
+ rows = context.fetchall_for_returning(cursor)
+
+ # I would have thought "is_returning: Final[bool]"
+ # would have assured this but pylance thinks not
+ assert result is not None
+
+ if imv.num_sentinel_columns and not imv_batch.is_downgraded:
+ composite_sentinel = imv.num_sentinel_columns > 1
+ if imv.implicit_sentinel:
+ # for implicit sentinel, which is currently single-col
+ # integer autoincrement, do a simple sort.
+ assert not composite_sentinel
+ result.extend(
+ sorted(rows, key=operator.itemgetter(-1))
+ )
+ continue
+
+ # otherwise, create dictionaries to match up batches
+ # with parameters
+ assert imv.sentinel_param_keys
+
+ if composite_sentinel:
+ _nsc = imv.num_sentinel_columns
+ rows_by_sentinel = {
+ tuple(row[-_nsc:]): row for row in rows
+ }
+ else:
+ rows_by_sentinel = {row[-1]: row for row in rows}
+
+ if len(rows_by_sentinel) != len(imv_batch.batch):
+ # see test_insert_exec.py::
+ # IMVSentinelTest::test_sentinel_incorrect_rowcount
+ # for coverage / demonstration
+ raise exc.InvalidRequestError(
+ f"Sentinel-keyed result set did not produce "
+ f"correct number of rows {len(imv_batch.batch)}; "
+ "produced "
+ f"{len(rows_by_sentinel)}. Please ensure the "
+ "sentinel column is fully unique and populated in "
+ "all cases."
+ )
+
+ try:
+ if composite_sentinel:
+ if sentinel_value_resolvers:
+ # composite sentinel (PK) with value resolvers
+ ordered_rows = [
+ rows_by_sentinel[
+ tuple(
+ _resolver(parameters[_spk]) # type: ignore # noqa: E501
+ if _resolver
+ else parameters[_spk] # type: ignore # noqa: E501
+ for _resolver, _spk in zip(
+ sentinel_value_resolvers,
+ imv.sentinel_param_keys,
+ )
+ )
+ ]
+ for parameters in imv_batch.batch
+ ]
+ else:
+ # composite sentinel (PK) with no value
+ # resolvers
+ ordered_rows = [
+ rows_by_sentinel[
+ tuple(
+ parameters[_spk] # type: ignore
+ for _spk in imv.sentinel_param_keys
+ )
+ ]
+ for parameters in imv_batch.batch
+ ]
+ else:
+ _sentinel_param_key = imv.sentinel_param_keys[0]
+ if (
+ sentinel_value_resolvers
+ and sentinel_value_resolvers[0]
+ ):
+ # single-column sentinel with value resolver
+ _sentinel_value_resolver = (
+ sentinel_value_resolvers[0]
+ )
+ ordered_rows = [
+ rows_by_sentinel[
+ _sentinel_value_resolver(
+ parameters[_sentinel_param_key] # type: ignore # noqa: E501
+ )
+ ]
+ for parameters in imv_batch.batch
+ ]
+ else:
+ # single-column sentinel with no value resolver
+ ordered_rows = [
+ rows_by_sentinel[
+ parameters[_sentinel_param_key] # type: ignore # noqa: E501
+ ]
+ for parameters in imv_batch.batch
+ ]
+ except KeyError as ke:
+ # see test_insert_exec.py::
+ # IMVSentinelTest::test_sentinel_cant_match_keys
+ # for coverage / demonstration
+ raise exc.InvalidRequestError(
+ f"Can't match sentinel values in result set to "
+ f"parameter sets; key {ke.args[0]!r} was not "
+ "found. "
+ "There may be a mismatch between the datatype "
+ "passed to the DBAPI driver vs. that which it "
+ "returns in a result row. Try using a different "
+ "datatype, such as integer"
+ ) from ke
+
+ result.extend(ordered_rows)
+
+ else:
+ result.extend(rows)
def do_executemany(self, cursor, statement, parameters, context=None):
cursor.executemany(statement, parameters)
@@ -1043,6 +1214,7 @@ class DefaultExecutionContext(ExecutionContext):
_empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
_insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
+ _num_sentinel_cols: int = 0
@classmethod
def _init_ddl(
@@ -1152,6 +1324,17 @@ class DefaultExecutionContext(ExecutionContext):
)
elif (
ii
+ and dml_statement._sort_by_parameter_order
+ and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
+ ):
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "INSERT..RETURNING with deterministic row ordering "
+ "when executemany is used"
+ )
+ elif (
+ ii
and self.dialect.use_insertmanyvalues
and not compiled._insertmanyvalues
):
@@ -1194,6 +1377,10 @@ class DefaultExecutionContext(ExecutionContext):
if len(parameters) > 1:
if self.isinsert and compiled._insertmanyvalues:
self.execute_style = ExecuteStyle.INSERTMANYVALUES
+
+ imv = compiled._insertmanyvalues
+ if imv.sentinel_columns is not None:
+ self._num_sentinel_cols = imv.num_sentinel_columns
else:
self.execute_style = ExecuteStyle.EXECUTEMANY
@@ -1525,6 +1712,9 @@ class DefaultExecutionContext(ExecutionContext):
self._is_server_side = False
return self.create_default_cursor()
+ def fetchall_for_returning(self, cursor):
+ return cursor.fetchall()
+
def create_default_cursor(self):
return self._dbapi_connection.cursor()
@@ -1689,6 +1879,13 @@ class DefaultExecutionContext(ExecutionContext):
)
if cursor_description is None:
strategy = _cursor._NO_CURSOR_DML
+ elif self._num_sentinel_cols:
+ assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
+ if cursor_description:
+ # strip out the sentinel columns from cursor description
+ cursor_description = cursor_description[
+ 0 : -(self._num_sentinel_cols)
+ ]
result: _cursor.CursorResult[Any] = _cursor.CursorResult(
self, strategy, cursor_description
@@ -2059,21 +2256,14 @@ class DefaultExecutionContext(ExecutionContext):
key_getter = compiled._within_exec_param_key_getter
- # pre-determine scalar Python-side defaults
- # to avoid many calls of get_insert_default()/
- # get_update_default()
+ sentinel_counter = 0
+
if compiled.insert_prefetch:
prefetch_recs = [
(
c,
key_getter(c),
- (
- c.default.arg, # type: ignore
- c.default.is_scalar,
- c.default.is_callable,
- )
- if c.default and c.default.has_arg
- else (None, None, None),
+ c._default_description_tuple,
self.get_insert_default,
)
for c in compiled.insert_prefetch
@@ -2083,13 +2273,7 @@ class DefaultExecutionContext(ExecutionContext):
(
c,
key_getter(c),
- (
- c.onupdate.arg, # type: ignore
- c.onupdate.is_scalar,
- c.onupdate.is_callable,
- )
- if c.onupdate and c.onupdate.has_arg
- else (None, None, None),
+ c._onupdate_description_tuple,
self.get_update_default,
)
for c in compiled.update_prefetch
@@ -2103,10 +2287,13 @@ class DefaultExecutionContext(ExecutionContext):
for (
c,
param_key,
- (arg, is_scalar, is_callable),
+ (arg, is_scalar, is_callable, is_sentinel),
fallback,
) in prefetch_recs:
- if is_scalar:
+ if is_sentinel:
+ param[param_key] = sentinel_counter
+ sentinel_counter += 1
+ elif is_scalar:
param[param_key] = arg
elif is_callable:
self.current_column = c
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index 254aba4bc..0216c155d 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -55,8 +55,10 @@ if TYPE_CHECKING:
from ..event import dispatcher
from ..exc import StatementError
from ..sql import Executable
+ from ..sql.compiler import _InsertManyValuesBatch
from ..sql.compiler import DDLCompiler
from ..sql.compiler import IdentifierPreparer
+ from ..sql.compiler import InsertmanyvaluesSentinelOpts
from ..sql.compiler import Linting
from ..sql.compiler import SQLCompiler
from ..sql.elements import BindParameter
@@ -236,14 +238,16 @@ _DBAPIMultiExecuteParams = Union[
_DBAPIAnyExecuteParams = Union[
_DBAPIMultiExecuteParams, _DBAPISingleExecuteParams
]
-_DBAPICursorDescription = Tuple[
- str,
- "DBAPIType",
- Optional[int],
- Optional[int],
- Optional[int],
- Optional[int],
- Optional[bool],
+_DBAPICursorDescription = Sequence[
+ Tuple[
+ str,
+ "DBAPIType",
+ Optional[int],
+ Optional[int],
+ Optional[int],
+ Optional[int],
+ Optional[bool],
+ ]
]
_AnySingleExecuteParams = _DBAPISingleExecuteParams
@@ -609,9 +613,21 @@ class BindTyping(Enum):
aren't.
When RENDER_CASTS is used, the compiler will invoke the
- :meth:`.SQLCompiler.render_bind_cast` method for each
- :class:`.BindParameter` object whose dialect-level type sets the
- :attr:`.TypeEngine.render_bind_cast` attribute.
+ :meth:`.SQLCompiler.render_bind_cast` method for the rendered
+ string representation of each :class:`.BindParameter` object whose
+ dialect-level type sets the :attr:`.TypeEngine.render_bind_cast` attribute.
+
+ The :meth:`.SQLCompiler.render_bind_cast` is also used to render casts
+ for one form of "insertmanyvalues" query, when both
+ :attr:`.InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT` and
+ :attr:`.InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS` are set,
+ where the casts are applied to the intermediary columns e.g.
+ "INSERT INTO t (a, b, c) SELECT p0::TYP, p1::TYP, p2::TYP "
+ "FROM (VALUES (?, ?), (?, ?), ...)".
+
+ .. versionadded:: 2.0.10 - :meth:`.SQLCompiler.render_bind_cast` is now
+ used within some elements of the "insertmanyvalues" implementation.
+
"""
@@ -838,6 +854,14 @@ class Dialect(EventTarget):
"""
+ insert_executemany_returning_sort_by_parameter_order: bool
+ """dialect / driver / database supports some means of providing
+ INSERT...RETURNING support when dialect.do_executemany() is used
+ along with the :paramref:`_dml.Insert.returning.sort_by_parameter_order`
+ parameter being set.
+
+ """
+
update_executemany_returning: bool
"""dialect supports UPDATE..RETURNING with executemany."""
@@ -881,6 +905,23 @@ class Dialect(EventTarget):
.. versionadded:: 2.0
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
+ """
+
+ insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts
+ """Options indicating the database supports a form of bulk INSERT where
+ the autoincrement integer primary key can be reliably used as an ordering
+ for INSERTed rows.
+
+ .. versionadded:: 2.0.10
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues_returning_order`
+
"""
insertmanyvalues_page_size: int
@@ -2116,15 +2157,7 @@ class Dialect(EventTarget):
parameters: _DBAPIMultiExecuteParams,
generic_setinputsizes: Optional[_GenericSetInputSizesType],
context: ExecutionContext,
- ) -> Iterator[
- Tuple[
- str,
- _DBAPISingleExecuteParams,
- _GenericSetInputSizesType,
- int,
- int,
- ]
- ]:
+ ) -> Iterator[_InsertManyValuesBatch]:
"""convert executemany parameters for an INSERT into an iterator
of statement/single execute values, used by the insertmanyvalues
feature.
@@ -3112,6 +3145,24 @@ class ExecutionContext:
raise NotImplementedError()
+ def fetchall_for_returning(self, cursor: DBAPICursor) -> Sequence[Any]:
+ """For a RETURNING result, deliver cursor.fetchall() from the
+ DBAPI cursor.
+
+ This is a dialect-specific hook for dialects that have special
+ considerations when calling upon the rows delivered for a
+ "RETURNING" statement. Default implementation is
+ ``cursor.fetchall()``.
+
+ This hook is currently used only by the :term:`insertmanyvalues`
+ feature. Dialects that don't set ``use_insertmanyvalues=True``
+ don't need to consider this hook.
+
+ .. versionadded:: 2.0.10
+
+ """
+ raise NotImplementedError()
+
class ConnectionEventsTarget(EventTarget):
"""An object which can accept events from :class:`.ConnectionEvents`.