diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-18 15:08:37 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-09-24 11:15:32 -0400 |
| commit | 2bcc97da424eef7db9a5d02f81d02344925415ee (patch) | |
| tree | 13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /lib/sqlalchemy/engine/default.py | |
| parent | 332188e5680574368001ded52eb0a9d259ecdef5 (diff) | |
| download | sqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz | |
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends
when RETURNING is used,
except for Oracle that doesn't need it, and on
psycopg2 and mssql+pyodbc it is used for all INSERT statements,
not just those that use RETURNING.
third party dialects would need to opt in to the new feature
by setting use_insertmanyvalues to True.
Also adds dialect-level guards against using returning
with executemany where we dont have an implementation to
suit it. execute single w/ returning still defers to the
server without us checking.
Fixes: #6047
Fixes: #7907
Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'lib/sqlalchemy/engine/default.py')
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 160 |
1 files changed, 138 insertions, 22 deletions
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3a53f8157..11ab713d0 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -35,6 +35,7 @@ from typing import Set from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import Union import weakref from . import characteristics @@ -44,6 +45,7 @@ from .base import Connection from .interfaces import CacheStats from .interfaces import DBAPICursor from .interfaces import Dialect +from .interfaces import ExecuteStyle from .interfaces import ExecutionContext from .reflection import ObjectKind from .reflection import ObjectScope @@ -52,13 +54,16 @@ from .. import exc from .. import pool from .. import util from ..sql import compiler +from ..sql import dml from ..sql import expression from ..sql import type_api from ..sql._typing import is_tuple_type +from ..sql.base import _NoArg from ..sql.compiler import DDLCompiler from ..sql.compiler import SQLCompiler from ..sql.elements import quoted_name from ..sql.schema import default_is_scalar +from ..util.typing import Final from ..util.typing import Literal if typing.TYPE_CHECKING: @@ -146,7 +151,6 @@ class DefaultDialect(Dialect): update_returning_multifrom = False delete_returning_multifrom = False insert_returning = False - insert_executemany_returning = False cte_follows_insert = False @@ -208,6 +212,10 @@ class DefaultDialect(Dialect): supports_default_metavalue = False """dialect supports INSERT... VALUES (DEFAULT) syntax""" + default_metavalue_token = "DEFAULT" + """for INSERT... VALUES (DEFAULT) syntax, the token to put in the + parenthesis.""" + # not sure if this is a real thing but the compiler will deliver it # if this is the only flag enabled. supports_empty_insert = True @@ -215,6 +223,13 @@ class DefaultDialect(Dialect): supports_multivalues_insert = False + use_insertmanyvalues: bool = False + + use_insertmanyvalues_wo_returning: bool = False + + insertmanyvalues_page_size: int = 1000 + insertmanyvalues_max_parameters = 32700 + supports_is_distinct_from = True supports_server_side_cursors = False @@ -272,6 +287,8 @@ class DefaultDialect(Dialect): supports_native_boolean: Optional[bool] = None, max_identifier_length: Optional[int] = None, label_length: Optional[int] = None, + insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, + use_insertmanyvalues: Optional[bool] = None, # util.deprecated_params decorator cannot render the # Linting.NO_LINTING constant compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore @@ -332,6 +349,12 @@ class DefaultDialect(Dialect): self.label_length = label_length self.compiler_linting = compiler_linting + if use_insertmanyvalues is not None: + self.use_insertmanyvalues = use_insertmanyvalues + + if insertmanyvalues_page_size is not _NoArg.NO_ARG: + self.insertmanyvalues_page_size = insertmanyvalues_page_size + @util.deprecated_property( "2.0", "full_returning is deprecated, please use insert_returning, " @@ -344,6 +367,17 @@ class DefaultDialect(Dialect): and self.delete_returning ) + @property + def insert_executemany_returning(self): + return ( + self.insert_returning + and self.supports_multivalues_insert + and self.use_insertmanyvalues + ) + + update_executemany_returning = False + delete_executemany_returning = False + @util.memoized_property def loaded_dbapi(self) -> ModuleType: if self.dbapi is None: @@ -682,6 +716,27 @@ class DefaultDialect(Dialect): def do_release_savepoint(self, connection, name): connection.execute(expression.ReleaseSavepointClause(name)) + def _deliver_insertmanyvalues_batches( + self, cursor, statement, parameters, generic_setinputsizes, context + ): + context = cast(DefaultExecutionContext, context) + compiled = cast(SQLCompiler, context.compiled) + + is_returning: Final[bool] = bool(compiled.effective_returning) + batch_size = context.execution_options.get( + "insertmanyvalues_page_size", self.insertmanyvalues_page_size + ) + + if is_returning: + context._insertmanyvalues_rows = result = [] + + for batch_rec in compiled._deliver_insertmanyvalues_batches( + statement, parameters, generic_setinputsizes, batch_size + ): + yield batch_rec + if is_returning: + result.extend(cursor.fetchall()) + def do_executemany(self, cursor, statement, parameters, context=None): cursor.executemany(statement, parameters) @@ -936,7 +991,8 @@ class DefaultExecutionContext(ExecutionContext): is_text = False isddl = False - executemany = False + execute_style: ExecuteStyle = ExecuteStyle.EXECUTE + compiled: Optional[Compiled] = None result_column_struct: Optional[ Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] @@ -982,6 +1038,8 @@ class DefaultExecutionContext(ExecutionContext): _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) + _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None + @classmethod def _init_ddl( cls, @@ -1061,23 +1119,55 @@ class DefaultExecutionContext(ExecutionContext): compiled._loose_column_name_matching, ) - self.isinsert = compiled.isinsert - self.isupdate = compiled.isupdate - self.isdelete = compiled.isdelete + self.isinsert = ii = compiled.isinsert + self.isupdate = iu = compiled.isupdate + self.isdelete = id_ = compiled.isdelete self.is_text = compiled.isplaintext - if self.isinsert or self.isupdate or self.isdelete: + if ii or iu or id_: if TYPE_CHECKING: assert isinstance(compiled.statement, UpdateBase) self.is_crud = True - self._is_explicit_returning = bool(compiled.statement._returning) - self._is_implicit_returning = is_implicit_returning = bool( + self._is_explicit_returning = ier = bool( + compiled.statement._returning + ) + self._is_implicit_returning = iir = is_implicit_returning = bool( compiled.implicit_returning ) assert not ( is_implicit_returning and compiled.statement._returning ) + if (ier or iir) and compiled.for_executemany: + if ii and not self.dialect.insert_executemany_returning: + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "INSERT..RETURNING when executemany is used" + ) + elif ( + ii + and self.dialect.use_insertmanyvalues + and not compiled._insertmanyvalues + ): + raise exc.InvalidRequestError( + 'Statement does not have "insertmanyvalues" ' + "enabled, can't use INSERT..RETURNING with " + "executemany in this case." + ) + elif iu and not self.dialect.update_executemany_returning: + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "UPDATE..RETURNING when executemany is used" + ) + elif id_ and not self.dialect.delete_executemany_returning: + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "DELETE..RETURNING when executemany is used" + ) + if not parameters: self.compiled_parameters = [ compiled.construct_params( @@ -1096,7 +1186,11 @@ class DefaultExecutionContext(ExecutionContext): for grp, m in enumerate(parameters) ] - self.executemany = len(parameters) > 1 + if len(parameters) > 1: + if self.isinsert and compiled._insertmanyvalues: + self.execute_style = ExecuteStyle.INSERTMANYVALUES + else: + self.execute_style = ExecuteStyle.EXECUTEMANY self.unicode_statement = compiled.string @@ -1238,7 +1332,8 @@ class DefaultExecutionContext(ExecutionContext): dialect.execute_sequence_format(p) for p in parameters ] - self.executemany = len(parameters) > 1 + if len(parameters) > 1: + self.execute_style = ExecuteStyle.EXECUTEMANY self.statement = self.unicode_statement = statement @@ -1293,6 +1388,13 @@ class DefaultExecutionContext(ExecutionContext): else: return "unknown" + @property + def executemany(self): + return self.execute_style in ( + ExecuteStyle.EXECUTEMANY, + ExecuteStyle.INSERTMANYVALUES, + ) + @util.memoized_property def identifier_preparer(self): if self.compiled: @@ -1555,7 +1657,23 @@ class DefaultExecutionContext(ExecutionContext): def _setup_dml_or_text_result(self): compiled = cast(SQLCompiler, self.compiled) + strategy = self.cursor_fetch_strategy + if self.isinsert: + if ( + self.execute_style is ExecuteStyle.INSERTMANYVALUES + and compiled.effective_returning + ): + strategy = _cursor.FullyBufferedCursorFetchStrategy( + self.cursor, + initial_buffer=self._insertmanyvalues_rows, + # maintain alt cursor description if set by the + # dialect, e.g. mssql preserves it + alternate_description=( + strategy.alternate_cursor_description + ), + ) + if compiled.postfetch_lastrowid: self.inserted_primary_key_rows = ( self._setup_ins_pk_from_lastrowid() @@ -1564,7 +1682,6 @@ class DefaultExecutionContext(ExecutionContext): # the default inserted_primary_key_rows accessor will # return an "empty" primary key collection when accessed. - strategy = self.cursor_fetch_strategy if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: strategy = _cursor.BufferedRowCursorFetchStrategy( self.cursor, self.execution_options @@ -1675,8 +1792,11 @@ class DefaultExecutionContext(ExecutionContext): cast(SQLCompiler, self.compiled).postfetch ) - def _set_input_sizes(self): - """Given a cursor and ClauseParameters, call the appropriate + def _prepare_set_input_sizes( + self, + ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: + """Given a cursor and ClauseParameters, prepare arguments + in order to call the appropriate style of ``setinputsizes()`` on the cursor, using DB-API types from the bind parameter's ``TypeEngine`` objects. @@ -1691,14 +1811,14 @@ class DefaultExecutionContext(ExecutionContext): """ if self.isddl or self.is_text: - return + return None compiled = cast(SQLCompiler, self.compiled) inputsizes = compiled._get_set_input_sizes_lookup() if inputsizes is None: - return + return None dialect = self.dialect @@ -1775,12 +1895,8 @@ class DefaultExecutionContext(ExecutionContext): generic_inputsizes.append( (escaped_name, dbtype, bindparam.type) ) - try: - dialect.do_set_input_sizes(self.cursor, generic_inputsizes, self) - except BaseException as e: - self.root_connection._handle_dbapi_exception( - e, None, None, None, self - ) + + return generic_inputsizes def _exec_default(self, column, default, type_): if default.is_sequence: @@ -1906,7 +2022,7 @@ class DefaultExecutionContext(ExecutionContext): assert compile_state is not None if ( isolate_multiinsert_groups - and self.isinsert + and dml.isinsert(compile_state) and compile_state._has_multi_parameters ): if column._is_multiparam_column: |
