summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/default.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/default.py')
-rw-r--r--lib/sqlalchemy/engine/default.py247
1 files changed, 217 insertions, 30 deletions
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 462473de2..8992334ee 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -17,6 +17,7 @@ as the base class for their own corresponding classes.
from __future__ import annotations
import functools
+import operator
import random
import re
from time import perf_counter
@@ -60,6 +61,7 @@ from ..sql import type_api
from ..sql._typing import is_tuple_type
from ..sql.base import _NoArg
from ..sql.compiler import DDLCompiler
+from ..sql.compiler import InsertmanyvaluesSentinelOpts
from ..sql.compiler import SQLCompiler
from ..sql.elements import quoted_name
from ..util.typing import Final
@@ -223,6 +225,10 @@ class DefaultDialect(Dialect):
use_insertmanyvalues_wo_returning: bool = False
+ insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
+ InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
+ )
+
insertmanyvalues_page_size: int = 1000
insertmanyvalues_max_parameters = 32700
@@ -369,13 +375,42 @@ class DefaultDialect(Dialect):
and self.delete_returning
)
- @property
+ @util.memoized_property
def insert_executemany_returning(self):
- return (
- self.insert_returning
- and self.supports_multivalues_insert
- and self.use_insertmanyvalues
- )
+ """Default implementation for insert_executemany_returning, if not
+ otherwise overridden by the specific dialect.
+
+ The default dialect determines "insert_executemany_returning" is
+ available if the dialect in use has opted into using the
+ "use_insertmanyvalues" feature. If they haven't opted into that, then
+ this attribute is False, unless the dialect in question overrides this
+ and provides some other implementation (such as the Oracle dialect).
+
+ """
+ return self.insert_returning and self.use_insertmanyvalues
+
+ @util.memoized_property
+ def insert_executemany_returning_sort_by_parameter_order(self):
+ """Default implementation for
+ insert_executemany_returning_deterministic_order, if not otherwise
+ overridden by the specific dialect.
+
+ The default dialect determines "insert_executemany_returning" can have
+ deterministic order only if the dialect in use has opted into using the
+ "use_insertmanyvalues" feature, which implements deterministic ordering
+ using client side sentinel columns only by default. The
+ "insertmanyvalues" feature also features alternate forms that can
+ use server-generated PK values as "sentinels", but those are only
+ used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
+ bitflag enables those alternate SQL forms, which are disabled
+ by default.
+
+ If the dialect in use hasn't opted into that, then this attribute is
+ False, unless the dialect in question overrides this and provides some
+ other implementation (such as the Oracle dialect).
+
+ """
+ return self.insert_returning and self.use_insertmanyvalues
update_executemany_returning = False
delete_executemany_returning = False
@@ -725,20 +760,156 @@ class DefaultDialect(Dialect):
context = cast(DefaultExecutionContext, context)
compiled = cast(SQLCompiler, context.compiled)
+ imv = compiled._insertmanyvalues
+ assert imv is not None
+
is_returning: Final[bool] = bool(compiled.effective_returning)
batch_size = context.execution_options.get(
"insertmanyvalues_page_size", self.insertmanyvalues_page_size
)
+ sentinel_value_resolvers = None
+
if is_returning:
- context._insertmanyvalues_rows = result = []
+ result: Optional[List[Any]] = []
+ context._insertmanyvalues_rows = result
+
+ sort_by_parameter_order = imv.sort_by_parameter_order
- for batch_rec in compiled._deliver_insertmanyvalues_batches(
- statement, parameters, generic_setinputsizes, batch_size
+ if imv.num_sentinel_columns:
+ sentinel_value_resolvers = (
+ compiled._imv_sentinel_value_resolvers
+ )
+ else:
+ sort_by_parameter_order = False
+ result = None
+
+ for imv_batch in compiled._deliver_insertmanyvalues_batches(
+ statement,
+ parameters,
+ generic_setinputsizes,
+ batch_size,
+ sort_by_parameter_order,
):
- yield batch_rec
+ yield imv_batch
+
if is_returning:
- result.extend(cursor.fetchall())
+ rows = context.fetchall_for_returning(cursor)
+
+ # I would have thought "is_returning: Final[bool]"
+ # would have assured this but pylance thinks not
+ assert result is not None
+
+ if imv.num_sentinel_columns and not imv_batch.is_downgraded:
+ composite_sentinel = imv.num_sentinel_columns > 1
+ if imv.implicit_sentinel:
+ # for implicit sentinel, which is currently single-col
+ # integer autoincrement, do a simple sort.
+ assert not composite_sentinel
+ result.extend(
+ sorted(rows, key=operator.itemgetter(-1))
+ )
+ continue
+
+ # otherwise, create dictionaries to match up batches
+ # with parameters
+ assert imv.sentinel_param_keys
+
+ if composite_sentinel:
+ _nsc = imv.num_sentinel_columns
+ rows_by_sentinel = {
+ tuple(row[-_nsc:]): row for row in rows
+ }
+ else:
+ rows_by_sentinel = {row[-1]: row for row in rows}
+
+ if len(rows_by_sentinel) != len(imv_batch.batch):
+ # see test_insert_exec.py::
+ # IMVSentinelTest::test_sentinel_incorrect_rowcount
+ # for coverage / demonstration
+ raise exc.InvalidRequestError(
+ f"Sentinel-keyed result set did not produce "
+ f"correct number of rows {len(imv_batch.batch)}; "
+ "produced "
+ f"{len(rows_by_sentinel)}. Please ensure the "
+ "sentinel column is fully unique and populated in "
+ "all cases."
+ )
+
+ try:
+ if composite_sentinel:
+ if sentinel_value_resolvers:
+ # composite sentinel (PK) with value resolvers
+ ordered_rows = [
+ rows_by_sentinel[
+ tuple(
+ _resolver(parameters[_spk]) # type: ignore # noqa: E501
+ if _resolver
+ else parameters[_spk] # type: ignore # noqa: E501
+ for _resolver, _spk in zip(
+ sentinel_value_resolvers,
+ imv.sentinel_param_keys,
+ )
+ )
+ ]
+ for parameters in imv_batch.batch
+ ]
+ else:
+ # composite sentinel (PK) with no value
+ # resolvers
+ ordered_rows = [
+ rows_by_sentinel[
+ tuple(
+ parameters[_spk] # type: ignore
+ for _spk in imv.sentinel_param_keys
+ )
+ ]
+ for parameters in imv_batch.batch
+ ]
+ else:
+ _sentinel_param_key = imv.sentinel_param_keys[0]
+ if (
+ sentinel_value_resolvers
+ and sentinel_value_resolvers[0]
+ ):
+ # single-column sentinel with value resolver
+ _sentinel_value_resolver = (
+ sentinel_value_resolvers[0]
+ )
+ ordered_rows = [
+ rows_by_sentinel[
+ _sentinel_value_resolver(
+ parameters[_sentinel_param_key] # type: ignore # noqa: E501
+ )
+ ]
+ for parameters in imv_batch.batch
+ ]
+ else:
+ # single-column sentinel with no value resolver
+ ordered_rows = [
+ rows_by_sentinel[
+ parameters[_sentinel_param_key] # type: ignore # noqa: E501
+ ]
+ for parameters in imv_batch.batch
+ ]
+ except KeyError as ke:
+ # see test_insert_exec.py::
+ # IMVSentinelTest::test_sentinel_cant_match_keys
+ # for coverage / demonstration
+ raise exc.InvalidRequestError(
+ f"Can't match sentinel values in result set to "
+ f"parameter sets; key {ke.args[0]!r} was not "
+ "found. "
+ "There may be a mismatch between the datatype "
+ "passed to the DBAPI driver vs. that which it "
+ "returns in a result row. Try using a different "
+ "datatype, such as integer"
+ ) from ke
+
+ result.extend(ordered_rows)
+
+ else:
+ result.extend(rows)
def do_executemany(self, cursor, statement, parameters, context=None):
cursor.executemany(statement, parameters)
@@ -1043,6 +1214,7 @@ class DefaultExecutionContext(ExecutionContext):
_empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
_insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
+ _num_sentinel_cols: int = 0
@classmethod
def _init_ddl(
@@ -1152,6 +1324,17 @@ class DefaultExecutionContext(ExecutionContext):
)
elif (
ii
+ and dml_statement._sort_by_parameter_order
+ and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
+ ):
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "INSERT..RETURNING with deterministic row ordering "
+ "when executemany is used"
+ )
+ elif (
+ ii
and self.dialect.use_insertmanyvalues
and not compiled._insertmanyvalues
):
@@ -1194,6 +1377,10 @@ class DefaultExecutionContext(ExecutionContext):
if len(parameters) > 1:
if self.isinsert and compiled._insertmanyvalues:
self.execute_style = ExecuteStyle.INSERTMANYVALUES
+
+ imv = compiled._insertmanyvalues
+ if imv.sentinel_columns is not None:
+ self._num_sentinel_cols = imv.num_sentinel_columns
else:
self.execute_style = ExecuteStyle.EXECUTEMANY
@@ -1525,6 +1712,9 @@ class DefaultExecutionContext(ExecutionContext):
self._is_server_side = False
return self.create_default_cursor()
+ def fetchall_for_returning(self, cursor):
+ return cursor.fetchall()
+
def create_default_cursor(self):
return self._dbapi_connection.cursor()
@@ -1689,6 +1879,13 @@ class DefaultExecutionContext(ExecutionContext):
)
if cursor_description is None:
strategy = _cursor._NO_CURSOR_DML
+ elif self._num_sentinel_cols:
+ assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
+ if cursor_description:
+ # strip out the sentinel columns from cursor description
+ cursor_description = cursor_description[
+ 0 : -(self._num_sentinel_cols)
+ ]
result: _cursor.CursorResult[Any] = _cursor.CursorResult(
self, strategy, cursor_description
@@ -2059,21 +2256,14 @@ class DefaultExecutionContext(ExecutionContext):
key_getter = compiled._within_exec_param_key_getter
- # pre-determine scalar Python-side defaults
- # to avoid many calls of get_insert_default()/
- # get_update_default()
+ sentinel_counter = 0
+
if compiled.insert_prefetch:
prefetch_recs = [
(
c,
key_getter(c),
- (
- c.default.arg, # type: ignore
- c.default.is_scalar,
- c.default.is_callable,
- )
- if c.default and c.default.has_arg
- else (None, None, None),
+ c._default_description_tuple,
self.get_insert_default,
)
for c in compiled.insert_prefetch
@@ -2083,13 +2273,7 @@ class DefaultExecutionContext(ExecutionContext):
(
c,
key_getter(c),
- (
- c.onupdate.arg, # type: ignore
- c.onupdate.is_scalar,
- c.onupdate.is_callable,
- )
- if c.onupdate and c.onupdate.has_arg
- else (None, None, None),
+ c._onupdate_description_tuple,
self.get_update_default,
)
for c in compiled.update_prefetch
@@ -2103,10 +2287,13 @@ class DefaultExecutionContext(ExecutionContext):
for (
c,
param_key,
- (arg, is_scalar, is_callable),
+ (arg, is_scalar, is_callable, is_sentinel),
fallback,
) in prefetch_recs:
- if is_scalar:
+ if is_sentinel:
+ param[param_key] = sentinel_counter
+ sentinel_counter += 1
+ elif is_scalar:
param[param_key] = arg
elif is_callable:
self.current_column = c