summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/default.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-18 15:08:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:15:32 -0400
commit2bcc97da424eef7db9a5d02f81d02344925415ee (patch)
tree13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /lib/sqlalchemy/engine/default.py
parent332188e5680574368001ded52eb0a9d259ecdef5 (diff)
downloadsqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends when RETURNING is used, except for Oracle that doesn't need it, and on psycopg2 and mssql+pyodbc it is used for all INSERT statements, not just those that use RETURNING. third party dialects would need to opt in to the new feature by setting use_insertmanyvalues to True. Also adds dialect-level guards against using returning with executemany where we dont have an implementation to suit it. execute single w/ returning still defers to the server without us checking. Fixes: #6047 Fixes: #7907 Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'lib/sqlalchemy/engine/default.py')
-rw-r--r--lib/sqlalchemy/engine/default.py160
1 files changed, 138 insertions, 22 deletions
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 3a53f8157..11ab713d0 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -35,6 +35,7 @@ from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import Union
import weakref
from . import characteristics
@@ -44,6 +45,7 @@ from .base import Connection
from .interfaces import CacheStats
from .interfaces import DBAPICursor
from .interfaces import Dialect
+from .interfaces import ExecuteStyle
from .interfaces import ExecutionContext
from .reflection import ObjectKind
from .reflection import ObjectScope
@@ -52,13 +54,16 @@ from .. import exc
from .. import pool
from .. import util
from ..sql import compiler
+from ..sql import dml
from ..sql import expression
from ..sql import type_api
from ..sql._typing import is_tuple_type
+from ..sql.base import _NoArg
from ..sql.compiler import DDLCompiler
from ..sql.compiler import SQLCompiler
from ..sql.elements import quoted_name
from ..sql.schema import default_is_scalar
+from ..util.typing import Final
from ..util.typing import Literal
if typing.TYPE_CHECKING:
@@ -146,7 +151,6 @@ class DefaultDialect(Dialect):
update_returning_multifrom = False
delete_returning_multifrom = False
insert_returning = False
- insert_executemany_returning = False
cte_follows_insert = False
@@ -208,6 +212,10 @@ class DefaultDialect(Dialect):
supports_default_metavalue = False
"""dialect supports INSERT... VALUES (DEFAULT) syntax"""
+ default_metavalue_token = "DEFAULT"
+ """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
+ parenthesis."""
+
# not sure if this is a real thing but the compiler will deliver it
# if this is the only flag enabled.
supports_empty_insert = True
@@ -215,6 +223,13 @@ class DefaultDialect(Dialect):
supports_multivalues_insert = False
+ use_insertmanyvalues: bool = False
+
+ use_insertmanyvalues_wo_returning: bool = False
+
+ insertmanyvalues_page_size: int = 1000
+ insertmanyvalues_max_parameters = 32700
+
supports_is_distinct_from = True
supports_server_side_cursors = False
@@ -272,6 +287,8 @@ class DefaultDialect(Dialect):
supports_native_boolean: Optional[bool] = None,
max_identifier_length: Optional[int] = None,
label_length: Optional[int] = None,
+ insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
+ use_insertmanyvalues: Optional[bool] = None,
# util.deprecated_params decorator cannot render the
# Linting.NO_LINTING constant
compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
@@ -332,6 +349,12 @@ class DefaultDialect(Dialect):
self.label_length = label_length
self.compiler_linting = compiler_linting
+ if use_insertmanyvalues is not None:
+ self.use_insertmanyvalues = use_insertmanyvalues
+
+ if insertmanyvalues_page_size is not _NoArg.NO_ARG:
+ self.insertmanyvalues_page_size = insertmanyvalues_page_size
+
@util.deprecated_property(
"2.0",
"full_returning is deprecated, please use insert_returning, "
@@ -344,6 +367,17 @@ class DefaultDialect(Dialect):
and self.delete_returning
)
+ @property
+ def insert_executemany_returning(self):
+ return (
+ self.insert_returning
+ and self.supports_multivalues_insert
+ and self.use_insertmanyvalues
+ )
+
+ update_executemany_returning = False
+ delete_executemany_returning = False
+
@util.memoized_property
def loaded_dbapi(self) -> ModuleType:
if self.dbapi is None:
@@ -682,6 +716,27 @@ class DefaultDialect(Dialect):
def do_release_savepoint(self, connection, name):
connection.execute(expression.ReleaseSavepointClause(name))
+ def _deliver_insertmanyvalues_batches(
+ self, cursor, statement, parameters, generic_setinputsizes, context
+ ):
+ context = cast(DefaultExecutionContext, context)
+ compiled = cast(SQLCompiler, context.compiled)
+
+ is_returning: Final[bool] = bool(compiled.effective_returning)
+ batch_size = context.execution_options.get(
+ "insertmanyvalues_page_size", self.insertmanyvalues_page_size
+ )
+
+ if is_returning:
+ context._insertmanyvalues_rows = result = []
+
+ for batch_rec in compiled._deliver_insertmanyvalues_batches(
+ statement, parameters, generic_setinputsizes, batch_size
+ ):
+ yield batch_rec
+ if is_returning:
+ result.extend(cursor.fetchall())
+
def do_executemany(self, cursor, statement, parameters, context=None):
cursor.executemany(statement, parameters)
@@ -936,7 +991,8 @@ class DefaultExecutionContext(ExecutionContext):
is_text = False
isddl = False
- executemany = False
+ execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
+
compiled: Optional[Compiled] = None
result_column_struct: Optional[
Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
@@ -982,6 +1038,8 @@ class DefaultExecutionContext(ExecutionContext):
_empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
+ _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
+
@classmethod
def _init_ddl(
cls,
@@ -1061,23 +1119,55 @@ class DefaultExecutionContext(ExecutionContext):
compiled._loose_column_name_matching,
)
- self.isinsert = compiled.isinsert
- self.isupdate = compiled.isupdate
- self.isdelete = compiled.isdelete
+ self.isinsert = ii = compiled.isinsert
+ self.isupdate = iu = compiled.isupdate
+ self.isdelete = id_ = compiled.isdelete
self.is_text = compiled.isplaintext
- if self.isinsert or self.isupdate or self.isdelete:
+ if ii or iu or id_:
if TYPE_CHECKING:
assert isinstance(compiled.statement, UpdateBase)
self.is_crud = True
- self._is_explicit_returning = bool(compiled.statement._returning)
- self._is_implicit_returning = is_implicit_returning = bool(
+ self._is_explicit_returning = ier = bool(
+ compiled.statement._returning
+ )
+ self._is_implicit_returning = iir = is_implicit_returning = bool(
compiled.implicit_returning
)
assert not (
is_implicit_returning and compiled.statement._returning
)
+ if (ier or iir) and compiled.for_executemany:
+ if ii and not self.dialect.insert_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "INSERT..RETURNING when executemany is used"
+ )
+ elif (
+ ii
+ and self.dialect.use_insertmanyvalues
+ and not compiled._insertmanyvalues
+ ):
+ raise exc.InvalidRequestError(
+ 'Statement does not have "insertmanyvalues" '
+ "enabled, can't use INSERT..RETURNING with "
+ "executemany in this case."
+ )
+ elif iu and not self.dialect.update_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "UPDATE..RETURNING when executemany is used"
+ )
+ elif id_ and not self.dialect.delete_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "DELETE..RETURNING when executemany is used"
+ )
+
if not parameters:
self.compiled_parameters = [
compiled.construct_params(
@@ -1096,7 +1186,11 @@ class DefaultExecutionContext(ExecutionContext):
for grp, m in enumerate(parameters)
]
- self.executemany = len(parameters) > 1
+ if len(parameters) > 1:
+ if self.isinsert and compiled._insertmanyvalues:
+ self.execute_style = ExecuteStyle.INSERTMANYVALUES
+ else:
+ self.execute_style = ExecuteStyle.EXECUTEMANY
self.unicode_statement = compiled.string
@@ -1238,7 +1332,8 @@ class DefaultExecutionContext(ExecutionContext):
dialect.execute_sequence_format(p) for p in parameters
]
- self.executemany = len(parameters) > 1
+ if len(parameters) > 1:
+ self.execute_style = ExecuteStyle.EXECUTEMANY
self.statement = self.unicode_statement = statement
@@ -1293,6 +1388,13 @@ class DefaultExecutionContext(ExecutionContext):
else:
return "unknown"
+ @property
+ def executemany(self):
+ return self.execute_style in (
+ ExecuteStyle.EXECUTEMANY,
+ ExecuteStyle.INSERTMANYVALUES,
+ )
+
@util.memoized_property
def identifier_preparer(self):
if self.compiled:
@@ -1555,7 +1657,23 @@ class DefaultExecutionContext(ExecutionContext):
def _setup_dml_or_text_result(self):
compiled = cast(SQLCompiler, self.compiled)
+ strategy = self.cursor_fetch_strategy
+
if self.isinsert:
+ if (
+ self.execute_style is ExecuteStyle.INSERTMANYVALUES
+ and compiled.effective_returning
+ ):
+ strategy = _cursor.FullyBufferedCursorFetchStrategy(
+ self.cursor,
+ initial_buffer=self._insertmanyvalues_rows,
+ # maintain alt cursor description if set by the
+ # dialect, e.g. mssql preserves it
+ alternate_description=(
+ strategy.alternate_cursor_description
+ ),
+ )
+
if compiled.postfetch_lastrowid:
self.inserted_primary_key_rows = (
self._setup_ins_pk_from_lastrowid()
@@ -1564,7 +1682,6 @@ class DefaultExecutionContext(ExecutionContext):
# the default inserted_primary_key_rows accessor will
# return an "empty" primary key collection when accessed.
- strategy = self.cursor_fetch_strategy
if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
strategy = _cursor.BufferedRowCursorFetchStrategy(
self.cursor, self.execution_options
@@ -1675,8 +1792,11 @@ class DefaultExecutionContext(ExecutionContext):
cast(SQLCompiler, self.compiled).postfetch
)
- def _set_input_sizes(self):
- """Given a cursor and ClauseParameters, call the appropriate
+ def _prepare_set_input_sizes(
+ self,
+ ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
+ """Given a cursor and ClauseParameters, prepare arguments
+ in order to call the appropriate
style of ``setinputsizes()`` on the cursor, using DB-API types
from the bind parameter's ``TypeEngine`` objects.
@@ -1691,14 +1811,14 @@ class DefaultExecutionContext(ExecutionContext):
"""
if self.isddl or self.is_text:
- return
+ return None
compiled = cast(SQLCompiler, self.compiled)
inputsizes = compiled._get_set_input_sizes_lookup()
if inputsizes is None:
- return
+ return None
dialect = self.dialect
@@ -1775,12 +1895,8 @@ class DefaultExecutionContext(ExecutionContext):
generic_inputsizes.append(
(escaped_name, dbtype, bindparam.type)
)
- try:
- dialect.do_set_input_sizes(self.cursor, generic_inputsizes, self)
- except BaseException as e:
- self.root_connection._handle_dbapi_exception(
- e, None, None, None, self
- )
+
+ return generic_inputsizes
def _exec_default(self, column, default, type_):
if default.is_sequence:
@@ -1906,7 +2022,7 @@ class DefaultExecutionContext(ExecutionContext):
assert compile_state is not None
if (
isolate_multiinsert_groups
- and self.isinsert
+ and dml.isinsert(compile_state)
and compile_state._has_multi_parameters
):
if column._is_multiparam_column: