diff options
Diffstat (limited to 'lib')
28 files changed, 1658 insertions, 473 deletions
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index c7e88a643..085a2c27b 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -250,6 +250,19 @@ The process for fetching this value has several variants: INSERT INTO t (x) OUTPUT inserted.id VALUES (?) + As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also + used by default to optimize many-row INSERT statements; for SQL Server + the feature takes place for both RETURNING and-non RETURNING + INSERT statements. + +* The value of :paramref:`_sa.create_engine.insertmanyvalues_page_size` + defaults to 1000, however the ultimate page size for a particular INSERT + statement may be limited further, based on an observed limit of + 2100 bound parameters for a single statement in SQL Server. + The page size may also be modified on a per-engine + or per-statement basis; see the section + :ref:`engine_insertmanyvalues_page_size` for details. + * When RETURNING is not available or has been disabled via ``implicit_returning=False``, either the ``scope_identity()`` function or the ``@@identity`` variable is used; behavior varies by backend: @@ -258,9 +271,13 @@ The process for fetching this value has several variants: appended to the end of the INSERT statement; a second result set will be fetched in order to receive the value. Given a table as:: - t = Table('t', m, Column('id', Integer, primary_key=True), - Column('x', Integer), - implicit_returning=False) + t = Table( + 't', + metadata, + Column('id', Integer, primary_key=True), + Column('x', Integer), + implicit_returning=False + ) an INSERT will look like: @@ -731,6 +748,8 @@ compatibility level information. Because of this, if running under a backwards compatibility mode SQLAlchemy may attempt to use T-SQL statements that are unable to be parsed by the database server. +.. _mssql_triggers: + Triggers -------- @@ -755,9 +774,6 @@ Declarative form:: __table_args__ = {'implicit_returning':False} -This option can also be specified engine-wide using the -``implicit_returning=False`` argument on :func:`_sa.create_engine`. - .. _mssql_rowcount_versioning: Rowcount Support / ORM Versioning @@ -2846,6 +2862,12 @@ class MSDialect(default.DefaultDialect): supports_empty_insert = False supports_comments = True + supports_default_metavalue = False + """dialect supports INSERT... VALUES (DEFAULT) syntax - + SQL Server **does** support this, but **not** for the IDENTITY column, + so we can't turn this on. + + """ # supports_native_uuid is partial here, so we implement our # own impl type @@ -2892,6 +2914,19 @@ class MSDialect(default.DefaultDialect): non_native_boolean_check_constraint = False supports_unicode_binds = True postfetch_lastrowid = True + + # may be changed at server inspection time for older SQL server versions + supports_multivalues_insert = True + + use_insertmanyvalues = True + + use_insertmanyvalues_wo_returning = True + + # "The incoming request has too many parameters. The server supports a " + # "maximum of 2100 parameters." + # in fact you can have 2099 parameters. + insertmanyvalues_max_parameters = 2099 + _supports_offset_fetch = False _supports_nvarchar_max = False @@ -3054,6 +3089,9 @@ class MSDialect(default.DefaultDialect): if self.server_version_info >= MS_2008_VERSION: self.supports_multivalues_insert = True + else: + self.supports_multivalues_insert = False + if self.deprecate_large_types is None: self.deprecate_large_types = ( self.server_version_info >= MS_2012_VERSION diff --git a/lib/sqlalchemy/dialects/mssql/pyodbc.py b/lib/sqlalchemy/dialects/mssql/pyodbc.py index 22e385865..2eef971cc 100644 --- a/lib/sqlalchemy/dialects/mssql/pyodbc.py +++ b/lib/sqlalchemy/dialects/mssql/pyodbc.py @@ -289,23 +289,34 @@ versioning. Fast Executemany Mode --------------------- -The Pyodbc driver has added support for a "fast executemany" mode of execution +.. note:: SQLAlchemy 2.0 now includes an equivalent "fast executemany" + handler for INSERT statements that is more robust than the PyODBC feature; + the feature is called :ref:`insertmanyvalues <engine_insertmanyvalues>` + and is enabled by default for all INSERT statements used by SQL Server. + SQLAlchemy's feature integrates with the PyODBC ``setinputsizes()`` method + which allows for more accurate specification of datatypes, and additionally + uses a dynamically sized, batched approach that scales to any number of + columns and/or rows. + + The SQL Server ``fast_executemany`` parameter may be used at the same time + as ``insertmanyvalues`` is enabled; however, the parameter will not be used + in as many cases as INSERT statements that are invoked using Core + :class:`.Insert` constructs as well as all ORM use no longer use the + ``.executemany()`` DBAPI cursor method. + +The PyODBC driver includes support for a "fast executemany" mode of execution which greatly reduces round trips for a DBAPI ``executemany()`` call when using Microsoft ODBC drivers, for **limited size batches that fit in memory**. The -feature is enabled by setting the flag ``.fast_executemany`` on the DBAPI -cursor when an executemany call is to be used. The SQLAlchemy pyodbc SQL -Server dialect supports setting this flag automatically when the -``.fast_executemany`` flag is passed to -:func:`_sa.create_engine` ; note that the ODBC driver must be the Microsoft -driver in order to use this flag:: +feature is enabled by setting the attribute ``.fast_executemany`` on the DBAPI +cursor when an executemany call is to be used. The SQLAlchemy PyODBC SQL +Server dialect supports this parameter by passing the +``fast_executemany`` parameter to +:func:`_sa.create_engine` , when using the **Microsoft ODBC driver only**:: engine = create_engine( - "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server", + "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True) -.. warning:: The pyodbc fast_executemany mode **buffers all rows in memory** and is - not compatible with very large batches of data. A future version of SQLAlchemy - may support this flag as a per-execution option instead. .. versionadded:: 1.3 @@ -319,11 +330,13 @@ driver in order to use this flag:: Setinputsizes Support ----------------------- -As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used by -default except for .executemany() calls when fast_executemany=True. +As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used for +all statement executions, except for ``cursor.executemany()`` calls when +fast_executemany=True where it is not supported (assuming +:ref:`insertmanyvalues <engine_insertmanyvalues>` is kept enabled, +"fastexecutemany" will not take place for INSERT statements in any case). -The behavior of setinputsizes can be customized, as may be necessary -particularly if fast_executemany is in use, via the +The behavior of setinputsizes can be customized via the :meth:`.DialectEvents.do_setinputsizes` hook. See that method for usage examples. @@ -331,7 +344,8 @@ examples. unless ``use_setinputsizes=True`` is passed. .. versionchanged:: 2.0 The mssql+pyodbc dialect now defaults to using - setinputsizes except for .executemany() calls when fast_executemany=True. + setinputsizes for all statement executions with the exception of + cursor.executemany() calls when fast_executemany=True. """ # noqa diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index c0521f61e..a3e99514b 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -2402,6 +2402,8 @@ class MySQLDialect(default.DefaultDialect): supports_default_values = False supports_default_metavalue = True + use_insertmanyvalues: bool = True + supports_sane_rowcount = True supports_sane_multi_rowcount = False supports_multivalues_insert = True diff --git a/lib/sqlalchemy/dialects/mysql/provision.py b/lib/sqlalchemy/dialects/mysql/provision.py index c73875fec..36a5e9f54 100644 --- a/lib/sqlalchemy/dialects/mysql/provision.py +++ b/lib/sqlalchemy/dialects/mysql/provision.py @@ -6,6 +6,7 @@ from ...testing.provision import create_db from ...testing.provision import drop_db from ...testing.provision import generate_driver_url from ...testing.provision import temp_table_keyword_args +from ...testing.provision import upsert @generate_driver_url.for_db("mysql", "mariadb") @@ -78,3 +79,19 @@ def _mysql_drop_db(cfg, eng, ident): @temp_table_keyword_args.for_db("mysql", "mariadb") def _mysql_temp_table_keyword_args(cfg, eng): return {"prefixes": ["TEMPORARY"]} + + +@upsert.for_db("mariadb") +def _upsert(cfg, table, returning, set_lambda=None): + from sqlalchemy.dialects.mysql import insert + + stmt = insert(table) + + if set_lambda: + stmt = stmt.on_duplicate_key_update(**set_lambda(stmt.inserted)) + else: + pk1 = table.primary_key.c[0] + stmt = stmt.on_duplicate_key_update({pk1.key: pk1}) + + stmt = stmt.returning(*returning) + return stmt diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py index d9fb5c827..4571f51f7 100644 --- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -897,6 +897,8 @@ class OracleDialect_cx_oracle(OracleDialect): supports_sane_multi_rowcount = True insert_executemany_returning = True + update_executemany_returning = True + delete_executemany_returning = True bind_typing = interfaces.BindTyping.SETINPUTSIZES diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index c953d3447..96bac59d9 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -418,7 +418,6 @@ class AsyncAdapt_asyncpg_cursor: "description", "arraysize", "rowcount", - "_inputsizes", "_cursor", "_invalidate_schema_cache_asof", ) @@ -433,7 +432,6 @@ class AsyncAdapt_asyncpg_cursor: self.description = None self.arraysize = 1 self.rowcount = -1 - self._inputsizes = None self._invalidate_schema_cache_asof = 0 def close(self): diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 18a7c0a86..3e43d601f 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -1330,7 +1330,6 @@ from typing import List from typing import Optional from . import array as _array -from . import dml from . import hstore as _hstore from . import json as _json from . import pg_catalog @@ -1850,24 +1849,6 @@ class PGCompiler(compiler.SQLCompiler): return target_text - @util.memoized_property - def _is_safe_for_fast_insert_values_helper(self): - # don't allow fast executemany if _post_values_clause is - # present and is not an OnConflictDoNothing. what this means - # concretely is that the - # "fast insert executemany helper" won't be used, in other - # words we won't convert "executemany()" of many parameter - # sets into a single INSERT with many elements in VALUES. - # We can't apply that optimization safely if for example the - # statement includes a clause like "ON CONFLICT DO UPDATE" - - return self.insert_single_values_expr is not None and ( - self.statement._post_values_clause is None - or isinstance( - self.statement._post_values_clause, dml.OnConflictDoNothing - ) - ) - def visit_on_conflict_do_nothing(self, on_conflict, **kw): target_text = self._on_conflict_target(on_conflict, **kw) @@ -2804,6 +2785,7 @@ class PGDialect(default.DefaultDialect): sequences_optional = True preexecute_autoincrement_sequences = True postfetch_lastrowid = False + use_insertmanyvalues = True supports_comments = True supports_constraint_comments = True @@ -2813,6 +2795,7 @@ class PGDialect(default.DefaultDialect): supports_empty_insert = False supports_multivalues_insert = True + supports_identity_columns = True default_paramstyle = "pyformat" diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py index 0d17f28e0..8dd8a4995 100644 --- a/lib/sqlalchemy/dialects/postgresql/provision.py +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -14,6 +14,7 @@ from ...testing.provision import log from ...testing.provision import prepare_for_drop_tables from ...testing.provision import set_default_schema_on_connection from ...testing.provision import temp_table_keyword_args +from ...testing.provision import upsert @create_db.for_db("postgresql") @@ -125,3 +126,20 @@ def prepare_for_drop_tables(config, connection): "idle in transaction: %s" % ("; ".join(row._mapping["query"] for row in rows)) ) + + +@upsert.for_db("postgresql") +def _upsert(cfg, table, returning, set_lambda=None): + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(table) + + if set_lambda: + stmt = stmt.on_conflict_do_update( + index_elements=table.primary_key, set_=set_lambda(stmt.excluded) + ) + else: + stmt = stmt.on_conflict_do_nothing() + + stmt = stmt.returning(*returning) + return stmt diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index a01f20e99..350f4b616 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -230,25 +230,23 @@ Modern versions of psycopg2 include a feature known as `Fast Execution Helpers \ <https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which have been shown in benchmarking to improve psycopg2's executemany() -performance, primarily with INSERT statements, by multiple orders of magnitude. -SQLAlchemy internally makes use of these extensions for ``executemany()`` style -calls, which correspond to lists of parameters being passed to -:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter -sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever -possible. - -The two available extensions on the psycopg2 side are the ``execute_values()`` -and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the -``execute_values()`` extension for all qualifying INSERT statements. - -.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode - ``"values_only"`` for ``executemany_mode``, which allows an order of - magnitude performance improvement for INSERT statements, but does not - include "batch" mode for UPDATE and DELETE statements which removes the - ability of ``cursor.rowcount`` to function correctly. - -The use of these extensions is controlled by the ``executemany_mode`` flag -which may be passed to :func:`_sa.create_engine`:: +performance, primarily with INSERT statements, by at least +an order of magnitude. + +SQLAlchemy implements a native form of the "insert many values" +handler that will rewrite a single-row INSERT statement to accommodate for +many values at once within an extended VALUES clause; this handler is +equivalent to psycopg2's ``execute_values()`` handler; an overview of this +feature and its configuration are at :ref:`engine_insertmanyvalues`. + +.. versionadded:: 2.0 Replaced psycopg2's ``execute_values()`` fast execution + helper with a native SQLAlchemy mechanism referred towards as + :ref:`insertmanyvalues <engine_insertmanyvalues>`. + +The psycopg2 dialect retains the ability to use the psycopg2-specific +``execute_batch()`` feature, although it is not expected that this is a widely +used feature. The use of this extension may be enabled using the +``executemany_mode`` flag which may be passed to :func:`_sa.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", @@ -257,59 +255,55 @@ which may be passed to :func:`_sa.create_engine`:: Possible options for ``executemany_mode`` include: -* ``values_only`` - this is the default value. the psycopg2 execute_values() - extension is used for qualifying INSERT statements, which rewrites the INSERT - to include multiple VALUES clauses so that many parameter sets can be - inserted with one statement. - - .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode`` - which is also now the default. - -* ``None`` - No psycopg2 extensions are not used, and the usual - ``cursor.executemany()`` method is used when invoking statements with - multiple parameter sets. - -* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying - INSERT, UPDATE and DELETE statements, so that multiple copies - of a SQL query, each one corresponding to a parameter set passed to - ``executemany()``, are joined into a single SQL string separated by a - semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount` - attribute will not contain a value for executemany-style executions. - -* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT - statements, ``execute_batch`` is used for UPDATE and DELETE. - When using this mode, the :attr:`_engine.CursorResult.rowcount` +* ``values_only`` - this is the default value. SQLAlchemy's native + :ref:`insertmanyvalues <engine_insertmanyvalues>` handler is used for qualifying + INSERT statements, assuming + :paramref:`_sa.create_engine.use_insertmanyvalues` is left at + its default value of ``True``. This handler rewrites simple + INSERT statements to include multiple VALUES clauses so that many + parameter sets can be inserted with one statement. + +* ``'values_plus_batch'``- SQLAlchemy's native + :ref:`insertmanyvalues <engine_insertmanyvalues>` handler is used for qualifying + INSERT statements, assuming + :paramref:`_sa.create_engine.use_insertmanyvalues` is left at its default + value of ``True``. Then, psycopg2's ``execute_batch()`` handler is used for + qualifying UPDATE and DELETE statements when executed with multiple parameter + sets. When using this mode, the :attr:`_engine.CursorResult.rowcount` attribute will not contain a value for executemany-style executions against UPDATE and DELETE statements. -By "qualifying statements", we mean that the statement being executed -must be a Core :func:`_expression.insert`, :func:`_expression.update` -or :func:`_expression.delete` construct, and not a plain textual SQL -string or one constructed using :func:`_expression.text`. When using the -ORM, all insert/update/delete statements used by the ORM flush process +.. versionchanged:: 2.0 Removed the ``'batch'`` and ``'None'`` options + from psycopg2 ``executemany_mode``. Control over batching for INSERT + statements is now configured via the + :paramref:`_sa.create_engine.use_insertmanyvalues` engine-level parameter. + +The term "qualifying statements" refers to the statement being executed +being a Core :func:`_expression.insert`, :func:`_expression.update` +or :func:`_expression.delete` construct, and **not** a plain textual SQL +string or one constructed using :func:`_expression.text`. It also may **not** be +a special "extension" statement such as an "ON CONFLICT" "upsert" statement. +When using the ORM, all insert/update/delete statements used by the ORM flush process are qualifying. -The "page size" for the "values" and "batch" strategies can be affected -by using the ``executemany_batch_page_size`` and -``executemany_values_page_size`` engine parameters. These -control how many parameter sets -should be represented in each execution. The "values" page size defaults -to 1000, which is different that psycopg2's default. The "batch" page -size defaults to 100. These can be affected by passing new values to -:func:`_engine.create_engine`:: +The "page size" for the psycopg2 "batch" strategy can be affected +by using the ``executemany_batch_page_size`` parameter, which defaults to +100. + +For the "insertmanyvalues" feature, the page size can be controlled using the +:paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter, +which defaults to 1000. An example of modifying both parameters +is below:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", - executemany_mode='values', - executemany_values_page_size=10000, executemany_batch_page_size=500) - -.. versionchanged:: 1.4 - - The default for ``executemany_values_page_size`` is now 1000, up from - 100. + executemany_mode='values_plus_batch', + insertmanyvalues_page_size=5000, executemany_batch_page_size=500) .. seealso:: + :ref:`engine_insertmanyvalues` - background on "insertmanyvalues" + :ref:`tutorial_multiple_parameters` - General information on using the :class:`_engine.Connection` object to execute statements in such a way as to make @@ -484,13 +478,11 @@ from typing import cast from . import ranges from ._psycopg_common import _PGDialect_common_psycopg from ._psycopg_common import _PGExecutionContext_common_psycopg -from .base import PGCompiler from .base import PGIdentifierPreparer from .json import JSON from .json import JSONB from ... import types as sqltypes from ... import util -from ...engine import cursor as _cursor from ...util import FastIntFlag from ...util import parse_user_argument_for_enum @@ -561,22 +553,6 @@ class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg): _psycopg2_fetched_rows = None def post_exec(self): - if ( - self._psycopg2_fetched_rows - and self.compiled - and self.compiled.effective_returning - ): - # psycopg2 execute_values will provide for a real cursor where - # cursor.description works correctly. however, it executes the - # INSERT statement multiple times for multiple pages of rows, so - # while this cursor also supports calling .fetchall() directly, in - # order to get the list of all rows inserted across multiple pages, - # we have to retrieve the aggregated list from the execute_values() - # function directly. - strat_cls = _cursor.FullyBufferedCursorFetchStrategy - self.cursor_fetch_strategy = strat_cls( - self.cursor, initial_buffer=self._psycopg2_fetched_rows - ) self._log_notices(self.cursor) def _log_notices(self, cursor): @@ -597,24 +573,16 @@ class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg): cursor.connection.notices[:] = [] -class PGCompiler_psycopg2(PGCompiler): - pass - - class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): pass class ExecutemanyMode(FastIntFlag): - EXECUTEMANY_PLAIN = 0 - EXECUTEMANY_BATCH = 1 - EXECUTEMANY_VALUES = 2 - EXECUTEMANY_VALUES_PLUS_BATCH = EXECUTEMANY_BATCH | EXECUTEMANY_VALUES + EXECUTEMANY_VALUES = 0 + EXECUTEMANY_VALUES_PLUS_BATCH = 1 ( - EXECUTEMANY_PLAIN, - EXECUTEMANY_BATCH, EXECUTEMANY_VALUES, EXECUTEMANY_VALUES_PLUS_BATCH, ) = tuple(ExecutemanyMode) @@ -630,9 +598,9 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): # set to true based on psycopg2 version supports_sane_multi_rowcount = False execution_ctx_cls = PGExecutionContext_psycopg2 - statement_compiler = PGCompiler_psycopg2 preparer = PGIdentifierPreparer_psycopg2 psycopg2_version = (0, 0) + use_insertmanyvalues_wo_returning = True _has_native_hstore = True @@ -655,7 +623,6 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): self, executemany_mode="values_only", executemany_batch_page_size=100, - executemany_values_page_size=1000, **kwargs, ): _PGDialect_common_psycopg.__init__(self, **kwargs) @@ -665,19 +632,13 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): self.executemany_mode = parse_user_argument_for_enum( executemany_mode, { - EXECUTEMANY_PLAIN: [None], - EXECUTEMANY_BATCH: ["batch"], EXECUTEMANY_VALUES: ["values_only"], - EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"], + EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch"], }, "executemany_mode", ) - if self.executemany_mode & EXECUTEMANY_VALUES: - self.insert_executemany_returning = True - self.executemany_batch_page_size = executemany_batch_page_size - self.executemany_values_page_size = executemany_values_page_size if self.dbapi and hasattr(self.dbapi, "__version__"): m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) @@ -699,14 +660,8 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): is not None ) - # PGDialect.initialize() checks server version for <= 8.2 and sets - # this flag to False if so - if not self.insert_returning: - self.insert_executemany_returning = False - self.executemany_mode = EXECUTEMANY_PLAIN - - self.supports_sane_multi_rowcount = not ( - self.executemany_mode & EXECUTEMANY_BATCH + self.supports_sane_multi_rowcount = ( + self.executemany_mode is not EXECUTEMANY_VALUES_PLUS_BATCH ) @classmethod @@ -806,39 +761,7 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg): return None def do_executemany(self, cursor, statement, parameters, context=None): - if ( - self.executemany_mode & EXECUTEMANY_VALUES - and context - and context.isinsert - and context.compiled._is_safe_for_fast_insert_values_helper - ): - executemany_values = ( - "(%s)" % context.compiled.insert_single_values_expr - ) - - # guard for statement that was altered via event hook or similar - if executemany_values not in statement: - executemany_values = None - else: - executemany_values = None - - if executemany_values: - statement = statement.replace(executemany_values, "%s") - if self.executemany_values_page_size: - kwargs = {"page_size": self.executemany_values_page_size} - else: - kwargs = {} - xtras = self._psycopg2_extras - context._psycopg2_fetched_rows = xtras.execute_values( - cursor, - statement, - parameters, - template=executemany_values, - fetch=bool(context.compiled.effective_returning), - **kwargs, - ) - - elif self.executemany_mode & EXECUTEMANY_BATCH: + if self.executemany_mode is EXECUTEMANY_VALUES_PLUS_BATCH: if self.executemany_batch_page_size: kwargs = {"page_size": self.executemany_batch_page_size} else: diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 88c6dbe18..e57a84fe0 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -1936,6 +1936,7 @@ class SQLiteDialect(default.DefaultDialect): supports_empty_insert = False supports_cast = True supports_multivalues_insert = True + use_insertmanyvalues = True tuple_in_values = True supports_statement_cache = True insert_null_pk_still_autoincrements = True @@ -1944,6 +1945,13 @@ class SQLiteDialect(default.DefaultDialect): delete_returning = True update_returning_multifrom = True + supports_default_metavalue = True + """dialect supports INSERT... VALUES (DEFAULT) syntax""" + + default_metavalue_token = "NULL" + """for INSERT... VALUES (DEFAULT) syntax, the token to put in the + parenthesis.""" + default_paramstyle = "qmark" execution_ctx_cls = SQLiteExecutionContext statement_compiler = SQLiteCompiler @@ -2055,6 +2063,10 @@ class SQLiteDialect(default.DefaultDialect): self.delete_returning ) = self.insert_returning = False + if self.dbapi.sqlite_version_info < (3, 32, 0): + # https://www.sqlite.org/limits.html + self.insertmanyvalues_max_parameters = 999 + _isolation_lookup = util.immutabledict( {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} ) diff --git a/lib/sqlalchemy/dialects/sqlite/provision.py b/lib/sqlalchemy/dialects/sqlite/provision.py index a590f9f03..05ee6c625 100644 --- a/lib/sqlalchemy/dialects/sqlite/provision.py +++ b/lib/sqlalchemy/dialects/sqlite/provision.py @@ -14,6 +14,7 @@ from ...testing.provision import post_configure_engine from ...testing.provision import run_reap_dbs from ...testing.provision import stop_test_class_outside_fixtures from ...testing.provision import temp_table_keyword_args +from ...testing.provision import upsert # TODO: I can't get this to build dynamically with pytest-xdist procs @@ -142,3 +143,18 @@ def _reap_sqlite_dbs(url, idents): if os.path.exists(path): log.info("deleting SQLite database file: %s" % path) os.remove(path) + + +@upsert.for_db("sqlite") +def _upsert(cfg, table, returning, set_lambda=None): + from sqlalchemy.dialects.sqlite import insert + + stmt = insert(table) + + if set_lambda: + stmt = stmt.on_conflict_do_update(set_=set_lambda(stmt.excluded)) + else: + stmt = stmt.on_conflict_do_nothing() + + stmt = stmt.returning(*returning) + return stmt diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 2b9cf602a..1b07acab5 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -12,6 +12,7 @@ import typing from typing import Any from typing import Callable from typing import cast +from typing import Iterable from typing import Iterator from typing import List from typing import Mapping @@ -29,6 +30,7 @@ from .interfaces import BindTyping from .interfaces import ConnectionEventsTarget from .interfaces import DBAPICursor from .interfaces import ExceptionContext +from .interfaces import ExecuteStyle from .interfaces import ExecutionContext from .util import _distill_params_20 from .util import _distill_raw_params @@ -438,6 +440,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` describing the ORM version of ``yield_per`` + :param insertmanyvalues_page_size: number of rows to format into an + INSERT statement when the statement uses "insertmanyvalues" mode, + which is a paged form of bulk insert that is used for many backends + when using :term:`executemany` execution typically in conjunction + with RETURNING. Defaults to 1000. May also be modified on a + per-engine basis using the + :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`engine_insertmanyvalues` + :param schema_translate_map: Available on: :class:`_engine.Connection`, :class:`_engine.Engine`, :class:`_sql.Executable`. @@ -1795,8 +1811,39 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): context.pre_exec() + if context.execute_style is ExecuteStyle.INSERTMANYVALUES: + return self._exec_insertmany_context( + dialect, + context, + ) + else: + return self._exec_single_context( + dialect, context, statement, parameters + ) + + def _exec_single_context( + self, + dialect: Dialect, + context: ExecutionContext, + statement: Union[str, Compiled], + parameters: Optional[_AnyMultiExecuteParams], + ) -> CursorResult[Any]: + """continue the _execute_context() method for a single DBAPI + cursor.execute() or cursor.executemany() call. + + """ if dialect.bind_typing is BindTyping.SETINPUTSIZES: - context._set_input_sizes() + generic_setinputsizes = context._prepare_set_input_sizes() + + if generic_setinputsizes: + try: + dialect.do_set_input_sizes( + context.cursor, generic_setinputsizes, context + ) + except BaseException as e: + self._handle_dbapi_exception( + e, str(statement), parameters, None, context + ) cursor, str_statement, parameters = ( context.cursor, @@ -1840,13 +1887,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): ) else: self._log_info( - "[%s] [SQL parameters hidden due to hide_parameters=True]" - % (stats,) + "[%s] [SQL parameters hidden due to hide_parameters=True]", + stats, ) evt_handled: bool = False try: - if context.executemany: + if context.execute_style is ExecuteStyle.EXECUTEMANY: effective_parameters = cast( "_CoreMultiExecuteParams", effective_parameters ) @@ -1862,7 +1909,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): break if not evt_handled: self.dialect.do_executemany( - cursor, str_statement, effective_parameters, context + cursor, + str_statement, + effective_parameters, + context, ) elif not effective_parameters and context.no_parameters: if self.dialect._has_events: @@ -1914,6 +1964,151 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): return result + def _exec_insertmany_context( + self, + dialect: Dialect, + context: ExecutionContext, + ) -> CursorResult[Any]: + """continue the _execute_context() method for an "insertmanyvalues" + operation, which will invoke DBAPI + cursor.execute() one or more times with individual log and + event hook calls. + + """ + + if dialect.bind_typing is BindTyping.SETINPUTSIZES: + generic_setinputsizes = context._prepare_set_input_sizes() + else: + generic_setinputsizes = None + + cursor, str_statement, parameters = ( + context.cursor, + context.statement, + context.parameters, + ) + + effective_parameters = parameters + + engine_events = self._has_events or self.engine._has_events + if self.dialect._has_events: + do_execute_dispatch: Iterable[ + Any + ] = self.dialect.dispatch.do_execute + else: + do_execute_dispatch = () + + if self._echo: + stats = context._get_cache_stats() + " (insertmanyvalues)" + for ( + sub_stmt, + sub_params, + setinputsizes, + batchnum, + totalbatches, + ) in dialect._deliver_insertmanyvalues_batches( + cursor, + str_statement, + effective_parameters, + generic_setinputsizes, + context, + ): + + if setinputsizes: + try: + dialect.do_set_input_sizes( + context.cursor, setinputsizes, context + ) + except BaseException as e: + self._handle_dbapi_exception( + e, + sql_util._long_statement(sub_stmt), + sub_params, + None, + context, + ) + + if engine_events: + for fn in self.dispatch.before_cursor_execute: + sub_stmt, sub_params = fn( + self, + cursor, + sub_stmt, + sub_params, + context, + True, + ) + + if self._echo: + + self._log_info(sql_util._long_statement(sub_stmt)) + + if batchnum > 1: + stats = ( + f"insertmanyvalues batch {batchnum} " + f"of {totalbatches}" + ) + + if not self.engine.hide_parameters: + self._log_info( + "[%s] %r", + stats, + sql_util._repr_params( + sub_params, + batches=10, + ismulti=False, + ), + ) + else: + self._log_info( + "[%s] [SQL parameters hidden due to " + "hide_parameters=True]", + stats, + ) + + try: + for fn in do_execute_dispatch: + if fn( + cursor, + sub_stmt, + sub_params, + context, + ): + break + else: + dialect.do_execute(cursor, sub_stmt, sub_params, context) + + except BaseException as e: + self._handle_dbapi_exception( + e, + sql_util._long_statement(sub_stmt), + sub_params, + cursor, + context, + is_sub_exec=True, + ) + + if engine_events: + self.dispatch.after_cursor_execute( + self, + cursor, + str_statement, + effective_parameters, + context, + context.executemany, + ) + + try: + context.post_exec() + + result = context._setup_result_proxy() + + except BaseException as e: + self._handle_dbapi_exception( + e, str_statement, effective_parameters, cursor, context + ) + + return result + def _cursor_execute( self, cursor: DBAPICursor, @@ -1983,6 +2178,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): parameters: Optional[_AnyExecuteParams], cursor: Optional[DBAPICursor], context: Optional[ExecutionContext], + is_sub_exec: bool = False, ) -> NoReturn: exc_info = sys.exc_info() @@ -2001,6 +2197,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): invalidate_pool_on_disconnect = not is_exit_exception + ismulti: bool = ( + not is_sub_exec and context.executemany + if context is not None + else False + ) if self._reentrant_error: raise exc.DBAPIError.instance( statement, @@ -2009,7 +2210,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): self.dialect.loaded_dbapi.Error, hide_parameters=self.engine.hide_parameters, dialect=self.dialect, - ismulti=context.executemany if context is not None else None, + ismulti=ismulti, ).with_traceback(exc_info[2]) from e self._reentrant_error = True try: @@ -2030,9 +2231,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): hide_parameters=self.engine.hide_parameters, connection_invalidated=self._is_disconnect, dialect=self.dialect, - ismulti=context.executemany - if context is not None - else None, + ismulti=ismulti, ) else: sqlalchemy_exception = None diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index 36119ab24..a9b388d71 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -58,6 +58,7 @@ def create_engine( future: Literal[True], hide_parameters: bool = ..., implicit_returning: Literal[True] = ..., + insertmanyvalues_page_size: int = ..., isolation_level: _IsolationLevel = ..., json_deserializer: Callable[..., Any] = ..., json_serializer: Callable[..., Any] = ..., @@ -79,6 +80,7 @@ def create_engine( pool_use_lifo: bool = ..., plugins: List[str] = ..., query_cache_size: int = ..., + use_insertmanyvalues: bool = ..., **kwargs: Any, ) -> Engine: ... @@ -273,6 +275,23 @@ def create_engine(url: Union[str, "_url.URL"], **kwargs: Any) -> Engine: :paramref:`.Table.implicit_returning` parameter. + :param insertmanyvalues_page_size: number of rows to format into an + INSERT statement when the statement uses "insertmanyvalues" mode, which is + a paged form of bulk insert that is used for many backends when using + :term:`executemany` execution typically in conjunction with RETURNING. + Defaults to 1000, but may also be subject to dialect-specific limiting + factors which may override this value on a per-statement basis. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`engine_insertmanyvalues` + + :ref:`engine_insertmanyvalues_page_size` + + :paramref:`_engine.Connection.execution_options.insertmanyvalues_page_size` + :param isolation_level: optional string name of an isolation level which will be set on all new connections unconditionally. Isolation levels are typically some subset of the string names @@ -508,6 +527,15 @@ def create_engine(url: Union[str, "_url.URL"], **kwargs: Any) -> Engine: .. versionadded:: 1.4 + :param use_insertmanyvalues: True by default, use the "insertmanyvalues" + execution style for INSERT..RETURNING statements by default. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`engine_insertmanyvalues` + """ # noqa if "strategy" in kwargs: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3a53f8157..11ab713d0 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -35,6 +35,7 @@ from typing import Set from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import Union import weakref from . import characteristics @@ -44,6 +45,7 @@ from .base import Connection from .interfaces import CacheStats from .interfaces import DBAPICursor from .interfaces import Dialect +from .interfaces import ExecuteStyle from .interfaces import ExecutionContext from .reflection import ObjectKind from .reflection import ObjectScope @@ -52,13 +54,16 @@ from .. import exc from .. import pool from .. import util from ..sql import compiler +from ..sql import dml from ..sql import expression from ..sql import type_api from ..sql._typing import is_tuple_type +from ..sql.base import _NoArg from ..sql.compiler import DDLCompiler from ..sql.compiler import SQLCompiler from ..sql.elements import quoted_name from ..sql.schema import default_is_scalar +from ..util.typing import Final from ..util.typing import Literal if typing.TYPE_CHECKING: @@ -146,7 +151,6 @@ class DefaultDialect(Dialect): update_returning_multifrom = False delete_returning_multifrom = False insert_returning = False - insert_executemany_returning = False cte_follows_insert = False @@ -208,6 +212,10 @@ class DefaultDialect(Dialect): supports_default_metavalue = False """dialect supports INSERT... VALUES (DEFAULT) syntax""" + default_metavalue_token = "DEFAULT" + """for INSERT... VALUES (DEFAULT) syntax, the token to put in the + parenthesis.""" + # not sure if this is a real thing but the compiler will deliver it # if this is the only flag enabled. supports_empty_insert = True @@ -215,6 +223,13 @@ class DefaultDialect(Dialect): supports_multivalues_insert = False + use_insertmanyvalues: bool = False + + use_insertmanyvalues_wo_returning: bool = False + + insertmanyvalues_page_size: int = 1000 + insertmanyvalues_max_parameters = 32700 + supports_is_distinct_from = True supports_server_side_cursors = False @@ -272,6 +287,8 @@ class DefaultDialect(Dialect): supports_native_boolean: Optional[bool] = None, max_identifier_length: Optional[int] = None, label_length: Optional[int] = None, + insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, + use_insertmanyvalues: Optional[bool] = None, # util.deprecated_params decorator cannot render the # Linting.NO_LINTING constant compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore @@ -332,6 +349,12 @@ class DefaultDialect(Dialect): self.label_length = label_length self.compiler_linting = compiler_linting + if use_insertmanyvalues is not None: + self.use_insertmanyvalues = use_insertmanyvalues + + if insertmanyvalues_page_size is not _NoArg.NO_ARG: + self.insertmanyvalues_page_size = insertmanyvalues_page_size + @util.deprecated_property( "2.0", "full_returning is deprecated, please use insert_returning, " @@ -344,6 +367,17 @@ class DefaultDialect(Dialect): and self.delete_returning ) + @property + def insert_executemany_returning(self): + return ( + self.insert_returning + and self.supports_multivalues_insert + and self.use_insertmanyvalues + ) + + update_executemany_returning = False + delete_executemany_returning = False + @util.memoized_property def loaded_dbapi(self) -> ModuleType: if self.dbapi is None: @@ -682,6 +716,27 @@ class DefaultDialect(Dialect): def do_release_savepoint(self, connection, name): connection.execute(expression.ReleaseSavepointClause(name)) + def _deliver_insertmanyvalues_batches( + self, cursor, statement, parameters, generic_setinputsizes, context + ): + context = cast(DefaultExecutionContext, context) + compiled = cast(SQLCompiler, context.compiled) + + is_returning: Final[bool] = bool(compiled.effective_returning) + batch_size = context.execution_options.get( + "insertmanyvalues_page_size", self.insertmanyvalues_page_size + ) + + if is_returning: + context._insertmanyvalues_rows = result = [] + + for batch_rec in compiled._deliver_insertmanyvalues_batches( + statement, parameters, generic_setinputsizes, batch_size + ): + yield batch_rec + if is_returning: + result.extend(cursor.fetchall()) + def do_executemany(self, cursor, statement, parameters, context=None): cursor.executemany(statement, parameters) @@ -936,7 +991,8 @@ class DefaultExecutionContext(ExecutionContext): is_text = False isddl = False - executemany = False + execute_style: ExecuteStyle = ExecuteStyle.EXECUTE + compiled: Optional[Compiled] = None result_column_struct: Optional[ Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] @@ -982,6 +1038,8 @@ class DefaultExecutionContext(ExecutionContext): _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) + _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None + @classmethod def _init_ddl( cls, @@ -1061,23 +1119,55 @@ class DefaultExecutionContext(ExecutionContext): compiled._loose_column_name_matching, ) - self.isinsert = compiled.isinsert - self.isupdate = compiled.isupdate - self.isdelete = compiled.isdelete + self.isinsert = ii = compiled.isinsert + self.isupdate = iu = compiled.isupdate + self.isdelete = id_ = compiled.isdelete self.is_text = compiled.isplaintext - if self.isinsert or self.isupdate or self.isdelete: + if ii or iu or id_: if TYPE_CHECKING: assert isinstance(compiled.statement, UpdateBase) self.is_crud = True - self._is_explicit_returning = bool(compiled.statement._returning) - self._is_implicit_returning = is_implicit_returning = bool( + self._is_explicit_returning = ier = bool( + compiled.statement._returning + ) + self._is_implicit_returning = iir = is_implicit_returning = bool( compiled.implicit_returning ) assert not ( is_implicit_returning and compiled.statement._returning ) + if (ier or iir) and compiled.for_executemany: + if ii and not self.dialect.insert_executemany_returning: + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "INSERT..RETURNING when executemany is used" + ) + elif ( + ii + and self.dialect.use_insertmanyvalues + and not compiled._insertmanyvalues + ): + raise exc.InvalidRequestError( + 'Statement does not have "insertmanyvalues" ' + "enabled, can't use INSERT..RETURNING with " + "executemany in this case." + ) + elif iu and not self.dialect.update_executemany_returning: + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "UPDATE..RETURNING when executemany is used" + ) + elif id_ and not self.dialect.delete_executemany_returning: + raise exc.InvalidRequestError( + f"Dialect {self.dialect.dialect_description} with " + f"current server capabilities does not support " + "DELETE..RETURNING when executemany is used" + ) + if not parameters: self.compiled_parameters = [ compiled.construct_params( @@ -1096,7 +1186,11 @@ class DefaultExecutionContext(ExecutionContext): for grp, m in enumerate(parameters) ] - self.executemany = len(parameters) > 1 + if len(parameters) > 1: + if self.isinsert and compiled._insertmanyvalues: + self.execute_style = ExecuteStyle.INSERTMANYVALUES + else: + self.execute_style = ExecuteStyle.EXECUTEMANY self.unicode_statement = compiled.string @@ -1238,7 +1332,8 @@ class DefaultExecutionContext(ExecutionContext): dialect.execute_sequence_format(p) for p in parameters ] - self.executemany = len(parameters) > 1 + if len(parameters) > 1: + self.execute_style = ExecuteStyle.EXECUTEMANY self.statement = self.unicode_statement = statement @@ -1293,6 +1388,13 @@ class DefaultExecutionContext(ExecutionContext): else: return "unknown" + @property + def executemany(self): + return self.execute_style in ( + ExecuteStyle.EXECUTEMANY, + ExecuteStyle.INSERTMANYVALUES, + ) + @util.memoized_property def identifier_preparer(self): if self.compiled: @@ -1555,7 +1657,23 @@ class DefaultExecutionContext(ExecutionContext): def _setup_dml_or_text_result(self): compiled = cast(SQLCompiler, self.compiled) + strategy = self.cursor_fetch_strategy + if self.isinsert: + if ( + self.execute_style is ExecuteStyle.INSERTMANYVALUES + and compiled.effective_returning + ): + strategy = _cursor.FullyBufferedCursorFetchStrategy( + self.cursor, + initial_buffer=self._insertmanyvalues_rows, + # maintain alt cursor description if set by the + # dialect, e.g. mssql preserves it + alternate_description=( + strategy.alternate_cursor_description + ), + ) + if compiled.postfetch_lastrowid: self.inserted_primary_key_rows = ( self._setup_ins_pk_from_lastrowid() @@ -1564,7 +1682,6 @@ class DefaultExecutionContext(ExecutionContext): # the default inserted_primary_key_rows accessor will # return an "empty" primary key collection when accessed. - strategy = self.cursor_fetch_strategy if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: strategy = _cursor.BufferedRowCursorFetchStrategy( self.cursor, self.execution_options @@ -1675,8 +1792,11 @@ class DefaultExecutionContext(ExecutionContext): cast(SQLCompiler, self.compiled).postfetch ) - def _set_input_sizes(self): - """Given a cursor and ClauseParameters, call the appropriate + def _prepare_set_input_sizes( + self, + ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: + """Given a cursor and ClauseParameters, prepare arguments + in order to call the appropriate style of ``setinputsizes()`` on the cursor, using DB-API types from the bind parameter's ``TypeEngine`` objects. @@ -1691,14 +1811,14 @@ class DefaultExecutionContext(ExecutionContext): """ if self.isddl or self.is_text: - return + return None compiled = cast(SQLCompiler, self.compiled) inputsizes = compiled._get_set_input_sizes_lookup() if inputsizes is None: - return + return None dialect = self.dialect @@ -1775,12 +1895,8 @@ class DefaultExecutionContext(ExecutionContext): generic_inputsizes.append( (escaped_name, dbtype, bindparam.type) ) - try: - dialect.do_set_input_sizes(self.cursor, generic_inputsizes, self) - except BaseException as e: - self.root_connection._handle_dbapi_exception( - e, None, None, None, self - ) + + return generic_inputsizes def _exec_default(self, column, default, type_): if default.is_sequence: @@ -1906,7 +2022,7 @@ class DefaultExecutionContext(ExecutionContext): assert compile_state is not None if ( isolate_multiinsert_groups - and self.isinsert + and dml.isinsert(compile_state) and compile_state._has_multi_parameters ): if column._is_multiparam_column: diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 01b266d68..fb59acbd0 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -18,6 +18,7 @@ from typing import ClassVar from typing import Collection from typing import Dict from typing import Iterable +from typing import Iterator from typing import List from typing import Mapping from typing import MutableMapping @@ -62,6 +63,7 @@ if TYPE_CHECKING: from ..sql.elements import ClauseElement from ..sql.schema import Column from ..sql.schema import DefaultGenerator + from ..sql.schema import SchemaItem from ..sql.schema import Sequence as Sequence_SchemaItem from ..sql.sqltypes import Integer from ..sql.type_api import _TypeMemoDict @@ -80,6 +82,28 @@ class CacheStats(Enum): NO_DIALECT_SUPPORT = 4 +class ExecuteStyle(Enum): + """indicates the :term:`DBAPI` cursor method that will be used to invoke + a statement.""" + + EXECUTE = 0 + """indicates cursor.execute() will be used""" + + EXECUTEMANY = 1 + """indicates cursor.executemany() will be used.""" + + INSERTMANYVALUES = 2 + """indicates cursor.execute() will be used with an INSERT where the + VALUES expression will be expanded to accommodate for multiple + parameter sets + + .. seealso:: + + :ref:`engine_insertmanyvalues` + + """ + + class DBAPIConnection(Protocol): """protocol representing a :pep:`249` database connection. @@ -235,6 +259,8 @@ _ImmutableExecuteOptions = immutabledict[str, Any] _ParamStyle = Literal["qmark", "numeric", "named", "format", "pyformat"] +_GenericSetInputSizesType = List[Tuple[str, Any, "TypeEngine[Any]"]] + _IsolationLevel = Literal[ "SERIALIZABLE", "REPEATABLE READ", @@ -608,6 +634,8 @@ class Dialect(EventTarget): driver: str """identifying name for the dialect's DBAPI""" + dialect_description: str + dbapi: Optional[ModuleType] """A reference to the DBAPI module object itself. @@ -748,23 +776,125 @@ class Dialect(EventTarget): executemany. """ + supports_empty_insert: bool + """dialect supports INSERT () VALUES (), i.e. a plain INSERT with no + columns in it. + + This is not usually supported; an "empty" insert is typically + suited using either "INSERT..DEFAULT VALUES" or + "INSERT ... (col) VALUES (DEFAULT)". + + """ + supports_default_values: bool """dialect supports INSERT... DEFAULT VALUES syntax""" supports_default_metavalue: bool - """dialect supports INSERT... VALUES (DEFAULT) syntax""" + """dialect supports INSERT...(col) VALUES (DEFAULT) syntax. - supports_empty_insert: bool - """dialect supports INSERT () VALUES ()""" + Most databases support this in some way, e.g. SQLite supports it using + ``VALUES (NULL)``. MS SQL Server supports the syntax also however + is the only included dialect where we have this disabled, as + MSSQL does not support the field for the IDENTITY column, which is + usually where we like to make use of the feature. + + """ + + default_metavalue_token: str = "DEFAULT" + """for INSERT... VALUES (DEFAULT) syntax, the token to put in the + parenthesis. + + E.g. for SQLite this is the keyword "NULL". + + """ supports_multivalues_insert: bool """Target database supports INSERT...VALUES with multiple value - sets""" + sets, i.e. INSERT INTO table (cols) VALUES (...), (...), (...), ... + + """ + + insert_executemany_returning: bool + """dialect / driver / database supports some means of providing + INSERT...RETURNING support when dialect.do_executemany() is used. + + """ + + update_executemany_returning: bool + """dialect supports UPDATE..RETURNING with executemany.""" + + delete_executemany_returning: bool + """dialect supports DELETE..RETURNING with executemany.""" + + use_insertmanyvalues: bool + """if True, indicates "insertmanyvalues" functionality should be used + to allow for ``insert_executemany_returning`` behavior, if possible. + + In practice, setting this to True means: + + if ``supports_multivalues_insert``, ``insert_returning`` and + ``use_insertmanyvalues`` are all True, the SQL compiler will produce + an INSERT that will be interpreted by the :class:`.DefaultDialect` + as an :attr:`.ExecuteStyle.INSERTMANYVALUES` execution that allows + for INSERT of many rows with RETURNING by rewriting a single-row + INSERT statement to have multiple VALUES clauses, also executing + the statement multiple times for a series of batches when large numbers + of rows are given. + + The parameter is False for the default dialect, and is set to + True for SQLAlchemy internal dialects SQLite, MySQL/MariaDB, PostgreSQL, + SQL Server. It remains at False for Oracle, which provides native + "executemany with RETURNING" support and also does not support + ``supports_multivalues_insert``. For MySQL/MariaDB, those MySQL + dialects that don't support RETURNING will not report + ``insert_executemany_returning`` as True. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`engine_insertmanyvalues` + + """ + + use_insertmanyvalues_wo_returning: bool + """if True, and use_insertmanyvalues is also True, INSERT statements + that don't include RETURNING will also use "insertmanyvalues". + + .. versionadded:: 2.0 + + """ + + insertmanyvalues_page_size: int + """Number of rows to render into an individual INSERT..VALUES() statement + for :attr:`.ExecuteStyle.INSERTMANYVALUES` executions. + + The default dialect defaults this to 1000. + + .. versionadded:: 2.0 + + .. seealso:: + + :paramref:`_engine.Connection.execution_options.insertmanyvalues_page_size` - + execution option available on :class:`_engine.Connection`, statements + + """ # noqa: E501 + + insertmanyvalues_max_parameters: int + """Alternate to insertmanyvalues_page_size, will additionally limit + page size based on number of parameters total in the statement. + + + """ preexecute_autoincrement_sequences: bool """True if 'implicit' primary key functions must be executed separately - in order to get their value. This is currently oriented towards - PostgreSQL. + in order to get their value, if RETURNING is not used. + + This is currently oriented towards PostgreSQL when the + ``implicit_returning=False`` parameter is used on a :class:`.Table` + object. + """ insert_returning: bool @@ -810,6 +940,13 @@ class Dialect(EventTarget): """ + supports_identity_columns: bool + """target database supports IDENTITY""" + + cte_follows_insert: bool + """target database, when given a CTE with an INSERT statement, needs + the CTE to be below the INSERT""" + colspecs: MutableMapping[Type["TypeEngine[Any]"], Type["TypeEngine[Any]"]] """A dictionary of TypeEngine classes from sqlalchemy.types mapped to subclasses that are specific to the dialect class. This @@ -860,7 +997,7 @@ class Dialect(EventTarget): """ construct_arguments: Optional[ - List[Tuple[Type[ClauseElement], Mapping[str, Any]]] + List[Tuple[Type[Union[SchemaItem, ClauseElement]], Mapping[str, Any]]] ] = None """Optional set of argument specifiers for various SQLAlchemy constructs, typically schema items. @@ -1007,19 +1144,6 @@ class Dialect(EventTarget): _bind_typing_render_casts: bool - supports_identity_columns: bool - """target database supports IDENTITY""" - - cte_follows_insert: bool - """target database, when given a CTE with an INSERT statement, needs - the CTE to be below the INSERT""" - - insert_executemany_returning: bool - """dialect / driver / database supports some means of providing RETURNING - support when dialect.do_executemany() is used. - - """ - _type_memos: MutableMapping[TypeEngine[Any], "_TypeMemoDict"] def _builtin_onconnect(self) -> Optional[_ListenerFnType]: @@ -1826,8 +1950,8 @@ class Dialect(EventTarget): def do_set_input_sizes( self, cursor: DBAPICursor, - list_of_tuples: List[Tuple[str, Any, TypeEngine[Any]]], - context: "ExecutionContext", + list_of_tuples: _GenericSetInputSizesType, + context: ExecutionContext, ) -> Any: """invoke the cursor.setinputsizes() method with appropriate arguments @@ -1961,12 +2085,35 @@ class Dialect(EventTarget): raise NotImplementedError() + def _deliver_insertmanyvalues_batches( + self, + cursor: DBAPICursor, + statement: str, + parameters: _DBAPIMultiExecuteParams, + generic_setinputsizes: Optional[_GenericSetInputSizesType], + context: ExecutionContext, + ) -> Iterator[ + Tuple[ + str, + _DBAPISingleExecuteParams, + _GenericSetInputSizesType, + int, + int, + ] + ]: + """convert executemany parameters for an INSERT into an iterator + of statement/single execute values, used by the insertmanyvalues + feature. + + """ + raise NotImplementedError() + def do_executemany( self, cursor: DBAPICursor, statement: str, parameters: _DBAPIMultiExecuteParams, - context: Optional["ExecutionContext"] = None, + context: Optional[ExecutionContext] = None, ) -> None: """Provide an implementation of ``cursor.executemany(statement, parameters)``.""" @@ -2743,7 +2890,9 @@ class ExecutionContext: These are always stored as a list of parameter entries. A single-element list corresponds to a ``cursor.execute()`` call and a multiple-element - list corresponds to ``cursor.executemany()``. + list corresponds to ``cursor.executemany()``, except in the case + of :attr:`.ExecuteStyle.INSERTMANYVALUES` which will use + ``cursor.execute()`` one or more times. """ @@ -2756,8 +2905,23 @@ class ExecutionContext: isupdate: bool """True if the statement is an UPDATE.""" + execute_style: ExecuteStyle + """the style of DBAPI cursor method that will be used to execute + a statement. + + .. versionadded:: 2.0 + + """ + executemany: bool - """True if the parameters have determined this to be an executemany""" + """True if the context has a list of more than one parameter set. + + Historically this attribute links to whether ``cursor.execute()`` or + ``cursor.executemany()`` will be used. It also can now mean that + "insertmanyvalues" may be used which indicates one or more + ``cursor.execute()`` calls. + + """ prefetch_cols: util.generic_fn_descriptor[Optional[Sequence[Column[Any]]]] """a list of Column objects for which a client-side default @@ -2824,7 +2988,9 @@ class ExecutionContext: ) -> Any: raise NotImplementedError() - def _set_input_sizes(self) -> None: + def _prepare_set_input_sizes( + self, + ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: raise NotImplementedError() def _get_cache_stats(self) -> str: diff --git a/lib/sqlalchemy/orm/context.py b/lib/sqlalchemy/orm/context.py index 4f24103df..dc96f8c3c 100644 --- a/lib/sqlalchemy/orm/context.py +++ b/lib/sqlalchemy/orm/context.py @@ -776,6 +776,10 @@ class FromStatement(GroupedElement, Generative, TypedReturnsRows[_TP]): return self.element._all_selected_columns @property + def _return_defaults(self): + return self.element._return_defaults if is_dml(self.element) else None + + @property def _returning(self): return self.element._returning if is_dml(self.element) else None diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py index 6310f5b1b..abd528986 100644 --- a/lib/sqlalchemy/orm/persistence.py +++ b/lib/sqlalchemy/orm/persistence.py @@ -1489,7 +1489,7 @@ def _postfetch( prefetch_cols = result.context.compiled.prefetch postfetch_cols = result.context.compiled.postfetch - returning_cols = result.context.compiled.returning + returning_cols = result.context.compiled.effective_returning if ( mapper.version_id_col is not None diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index 20ffb385d..a690da0d5 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -3989,14 +3989,6 @@ class Session(_SessionClassMethods, EventTarget): and SQL clause support are **silently omitted** in favor of raw INSERT/UPDATES of records. - Please note that newer versions of SQLAlchemy are **greatly - improving the efficiency** of the standard flush process. It is - **strongly recommended** to not use the bulk methods as they - represent a forking of SQLAlchemy's functionality and are slowly - being moved into legacy status. New features such as - :ref:`orm_dml_returning_objects` are both more efficient than - the "bulk" methods and provide more predictable functionality. - **Please read the list of caveats at** :ref:`bulk_operations_caveats` **before using this method, and fully test and confirm the functionality of all code developed @@ -4108,8 +4100,6 @@ class Session(_SessionClassMethods, EventTarget): organizing the values within them across the tables to which the given mapper is mapped. - .. versionadded:: 1.0.0 - .. warning:: The bulk insert feature allows for a lower-latency INSERT @@ -4118,14 +4108,6 @@ class Session(_SessionClassMethods, EventTarget): and SQL clause support are **silently omitted** in favor of raw INSERT of records. - Please note that newer versions of SQLAlchemy are **greatly - improving the efficiency** of the standard flush process. It is - **strongly recommended** to not use the bulk methods as they - represent a forking of SQLAlchemy's functionality and are slowly - being moved into legacy status. New features such as - :ref:`orm_dml_returning_objects` are both more efficient than - the "bulk" methods and provide more predictable functionality. - **Please read the list of caveats at** :ref:`bulk_operations_caveats` **before using this method, and fully test and confirm the functionality of all code developed @@ -4142,19 +4124,18 @@ class Session(_SessionClassMethods, EventTarget): such as a joined-inheritance mapping, each dictionary must contain all keys to be populated into all tables. - :param return_defaults: when True, rows that are missing values which - generate defaults, namely integer primary key defaults and sequences, - will be inserted **one at a time**, so that the primary key value - is available. In particular this will allow joined-inheritance - and other multi-table mappings to insert correctly without the need - to provide primary - key values ahead of time; however, - :paramref:`.Session.bulk_insert_mappings.return_defaults` - **greatly reduces the performance gains** of the method overall. - If the rows - to be inserted only refer to a single table, then there is no - reason this flag should be set as the returned default information - is not used. + :param return_defaults: when True, the INSERT process will be altered + to ensure that newly generated primary key values will be fetched. + The rationale for this parameter is typically to enable + :ref:`Joined Table Inheritance <joined_inheritance>` mappings to + be bulk inserted. + + .. note:: for backends that don't support RETURNING, the + :paramref:`_orm.Session.bulk_insert_mappings.return_defaults` + parameter can significantly decrease performance as INSERT + statements can no longer be batched. See + :ref:`engine_insertmanyvalues` + for background on which backends are affected. :param render_nulls: When True, a value of ``None`` will result in a NULL value being included in the INSERT statement, rather @@ -4178,8 +4159,6 @@ class Session(_SessionClassMethods, EventTarget): to ensure that no server-side default functions need to be invoked for the operation as a whole. - .. versionadded:: 1.1 - .. seealso:: :ref:`bulk_operations` @@ -4211,8 +4190,6 @@ class Session(_SessionClassMethods, EventTarget): state management features in use, reducing latency when updating large numbers of simple rows. - .. versionadded:: 1.0.0 - .. warning:: The bulk update feature allows for a lower-latency UPDATE @@ -4221,14 +4198,6 @@ class Session(_SessionClassMethods, EventTarget): and SQL clause support are **silently omitted** in favor of raw UPDATES of records. - Please note that newer versions of SQLAlchemy are **greatly - improving the efficiency** of the standard flush process. It is - **strongly recommended** to not use the bulk methods as they - represent a forking of SQLAlchemy's functionality and are slowly - being moved into legacy status. New features such as - :ref:`orm_dml_returning_objects` are both more efficient than - the "bulk" methods and provide more predictable functionality. - **Please read the list of caveats at** :ref:`bulk_operations_caveats` **before using this method, and fully test and confirm the functionality of all code developed diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 1d13ffa9a..201324a2a 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -94,6 +94,7 @@ if typing.TYPE_CHECKING: from .elements import BindParameter from .elements import ColumnClause from .elements import ColumnElement + from .elements import KeyedColumnElement from .elements import Label from .functions import Function from .selectable import AliasedReturnsRows @@ -390,6 +391,19 @@ class _CompilerStackEntry(_BaseCompilerStackEntry, total=False): class ExpandedState(NamedTuple): + """represents state to use when producing "expanded" and + "post compile" bound parameters for a statement. + + "expanded" parameters are parameters that are generated at + statement execution time to suit a number of parameters passed, the most + prominent example being the individual elements inside of an IN expression. + + "post compile" parameters are parameters where the SQL literal value + will be rendered into the SQL statement at execution time, rather than + being passed as separate parameters to the driver. + + """ + statement: str additional_parameters: _CoreSingleExecuteParams processors: Mapping[str, _BindProcessorType[Any]] @@ -397,7 +411,23 @@ class ExpandedState(NamedTuple): parameter_expansion: Mapping[str, List[str]] +class _InsertManyValues(NamedTuple): + """represents state to use for executing an "insertmanyvalues" statement""" + + is_default_expr: bool + single_values_expr: str + insert_crud_params: List[Tuple[KeyedColumnElement[Any], str, str]] + num_positional_params_counted: int + + class Linting(IntEnum): + """represent preferences for the 'SQL linting' feature. + + this feature currently includes support for flagging cartesian products + in SQL statements. + + """ + NO_LINTING = 0 "Disable all linting." @@ -419,6 +449,9 @@ NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple( class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])): + """represents current state for the "cartesian product" detection + feature.""" + def lint(self, start=None): froms = self.froms if not froms: @@ -762,8 +795,6 @@ class SQLCompiler(Compiled): is_sql = True - _result_columns: List[ResultColumnsEntry] - compound_keywords = COMPOUND_KEYWORDS isdelete: bool = False @@ -810,12 +841,6 @@ class SQLCompiler(Compiled): """major statements such as SELECT, INSERT, UPDATE, DELETE are tracked in this stack using an entry format.""" - result_columns: List[ResultColumnsEntry] - """relates label names in the final SQL to a tuple of local - column/label name, ColumnElement object (if any) and - TypeEngine. CursorResult uses this for type processing and - column targeting""" - returning_precedes_values: bool = False """set to True classwide to generate RETURNING clauses before the VALUES or WHERE clause (i.e. MSSQL) @@ -835,6 +860,12 @@ class SQLCompiler(Compiled): driver/DB enforces this """ + _result_columns: List[ResultColumnsEntry] + """relates label names in the final SQL to a tuple of local + column/label name, ColumnElement object (if any) and + TypeEngine. CursorResult uses this for type processing and + column targeting""" + _textual_ordered_columns: bool = False """tell the result object that the column names as rendered are important, but they are also "ordered" vs. what is in the compiled object here. @@ -881,14 +912,9 @@ class SQLCompiler(Compiled): """ - insert_single_values_expr: Optional[str] = None - """When an INSERT is compiled with a single set of parameters inside - a VALUES expression, the string is assigned here, where it can be - used for insert batching schemes to rewrite the VALUES expression. + _insertmanyvalues: Optional[_InsertManyValues] = None - .. versionadded:: 1.3.8 - - """ + _insert_crud_params: Optional[crud._CrudParamSequence] = None literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset() """bindparameter objects that are rendered as literal values at statement @@ -1072,6 +1098,25 @@ class SQLCompiler(Compiled): if self._render_postcompile: self._process_parameters_for_postcompile(_populate_self=True) + @property + def insert_single_values_expr(self) -> Optional[str]: + """When an INSERT is compiled with a single set of parameters inside + a VALUES expression, the string is assigned here, where it can be + used for insert batching schemes to rewrite the VALUES expression. + + .. versionadded:: 1.3.8 + + .. versionchanged:: 2.0 This collection is no longer used by + SQLAlchemy's built-in dialects, in favor of the currently + internal ``_insertmanyvalues`` collection that is used only by + :class:`.SQLCompiler`. + + """ + if self._insertmanyvalues is None: + return None + else: + return self._insertmanyvalues.single_values_expr + @util.ro_memoized_property def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]: """The effective "returning" columns for INSERT, UPDATE or DELETE. @@ -1620,10 +1665,13 @@ class SQLCompiler(Compiled): param_key_getter = self._within_exec_param_key_getter + assert self.compile_state is not None + statement = self.compile_state.statement + if TYPE_CHECKING: - assert isinstance(self.statement, Insert) + assert isinstance(statement, Insert) - table = self.statement.table + table = statement.table getters = [ (operator.methodcaller("get", param_key_getter(col), None), col) @@ -1697,11 +1745,14 @@ class SQLCompiler(Compiled): else: result = util.preloaded.engine_result + assert self.compile_state is not None + statement = self.compile_state.statement + if TYPE_CHECKING: - assert isinstance(self.statement, Insert) + assert isinstance(statement, Insert) param_key_getter = self._within_exec_param_key_getter - table = self.statement.table + table = statement.table returning = self.implicit_returning assert returning is not None @@ -4506,7 +4557,202 @@ class SQLCompiler(Compiled): ) return dialect_hints, table_text + def _insert_stmt_should_use_insertmanyvalues(self, statement): + return ( + self.dialect.supports_multivalues_insert + and self.dialect.use_insertmanyvalues + # note self.implicit_returning or self._result_columns + # implies self.dialect.insert_returning capability + and ( + self.dialect.use_insertmanyvalues_wo_returning + or self.implicit_returning + or self._result_columns + ) + ) + + def _deliver_insertmanyvalues_batches( + self, statement, parameters, generic_setinputsizes, batch_size + ): + imv = self._insertmanyvalues + assert imv is not None + + executemany_values = f"({imv.single_values_expr})" + + lenparams = len(parameters) + if imv.is_default_expr and not self.dialect.supports_default_metavalue: + # backend doesn't support + # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ... + # at the moment this is basically SQL Server due to + # not being able to use DEFAULT for identity column + # just yield out that many single statements! still + # faster than a whole connection.execute() call ;) + # + # note we still are taking advantage of the fact that we know + # we are using RETURNING. The generalized approach of fetching + # cursor.lastrowid etc. still goes through the more heavyweight + # "ExecutionContext per statement" system as it isn't usable + # as a generic "RETURNING" approach + for batchnum, param in enumerate(parameters, 1): + yield ( + statement, + param, + generic_setinputsizes, + batchnum, + lenparams, + ) + return + else: + statement = statement.replace( + executemany_values, "__EXECMANY_TOKEN__" + ) + + # Use optional insertmanyvalues_max_parameters + # to further shrink the batch size so that there are no more than + # insertmanyvalues_max_parameters params. + # Currently used by SQL Server, which limits statements to 2100 bound + # parameters (actually 2099). + max_params = self.dialect.insertmanyvalues_max_parameters + if max_params: + total_num_of_params = len(self.bind_names) + num_params_per_batch = len(imv.insert_crud_params) + num_params_outside_of_batch = ( + total_num_of_params - num_params_per_batch + ) + batch_size = min( + batch_size, + ( + (max_params - num_params_outside_of_batch) + // num_params_per_batch + ), + ) + + batches = list(parameters) + + processed_setinputsizes = None + batchnum = 1 + total_batches = lenparams // batch_size + ( + 1 if lenparams % batch_size else 0 + ) + + insert_crud_params = imv.insert_crud_params + assert insert_crud_params is not None + + escaped_bind_names: Mapping[str, str] + if not self.positional: + if self.escaped_bind_names: + escaped_bind_names = self.escaped_bind_names + else: + escaped_bind_names = {} + + all_keys = set(parameters[0]) + + escaped_insert_crud_params: Sequence[Any] = [ + (escaped_bind_names.get(col.key, col.key), formatted) + for col, _, formatted in insert_crud_params + ] + + keys_to_replace = all_keys.intersection( + key for key, _ in escaped_insert_crud_params + ) + base_parameters = { + key: parameters[0][key] + for key in all_keys.difference(keys_to_replace) + } + executemany_values_w_comma = "" + else: + escaped_insert_crud_params = () + keys_to_replace = set() + base_parameters = {} + executemany_values_w_comma = f"({imv.single_values_expr}), " + + while batches: + batch = batches[0:batch_size] + batches[0:batch_size] = [] + + if generic_setinputsizes: + # if setinputsizes is present, expand this collection to + # suit the batch length as well + # currently this will be mssql+pyodbc for internal dialects + processed_setinputsizes = [ + (new_key, len_, typ) + for new_key, len_, typ in ( + (f"{key}_{index}", len_, typ) + for index in range(len(batch)) + for key, len_, typ in generic_setinputsizes + ) + ] + + replaced_parameters: Any + if self.positional: + # the assumption here is that any parameters that are not + # in the VALUES clause are expected to be parameterized + # expressions in the RETURNING (or maybe ON CONFLICT) clause. + # So based on + # which sequence comes first in the compiler's INSERT + # statement tells us where to expand the parameters. + + # otherwise we probably shouldn't be doing insertmanyvalues + # on the statement. + + num_ins_params = imv.num_positional_params_counted + + if num_ins_params == len(batch[0]): + extra_params = () + batch_iterator: Iterable[Tuple[Any, ...]] = batch + elif self.returning_precedes_values: + extra_params = batch[0][:-num_ins_params] + batch_iterator = (b[-num_ins_params:] for b in batch) + else: + extra_params = batch[0][num_ins_params:] + batch_iterator = (b[:num_ins_params] for b in batch) + + replaced_statement = statement.replace( + "__EXECMANY_TOKEN__", + (executemany_values_w_comma * len(batch))[:-2], + ) + + replaced_parameters = tuple( + itertools.chain.from_iterable(batch_iterator) + ) + if self.returning_precedes_values: + replaced_parameters = extra_params + replaced_parameters + else: + replaced_parameters = replaced_parameters + extra_params + else: + replaced_values_clauses = [] + replaced_parameters = base_parameters.copy() + + for i, param in enumerate(batch): + new_tokens = [ + formatted.replace(key, f"{key}__{i}") + if key in param + else formatted + for key, formatted in escaped_insert_crud_params + ] + replaced_values_clauses.append( + f"({', '.join(new_tokens)})" + ) + + replaced_parameters.update( + {f"{key}__{i}": param[key] for key in keys_to_replace} + ) + + replaced_statement = statement.replace( + "__EXECMANY_TOKEN__", + ", ".join(replaced_values_clauses), + ) + + yield ( + replaced_statement, + replaced_parameters, + processed_setinputsizes, + batchnum, + total_batches, + ) + batchnum += 1 + def visit_insert(self, insert_stmt, **kw): + compile_state = insert_stmt._compile_state_factory( insert_stmt, self, **kw ) @@ -4529,9 +4775,24 @@ class SQLCompiler(Compiled): } ) + positiontup_before = positiontup_after = 0 + + # for positional, insertmanyvalues needs to know how many + # bound parameters are in the VALUES sequence; there's no simple + # rule because default expressions etc. can have zero or more + # params inside them. After multiple attempts to figure this out, + # this very simplistic "count before, then count after" works and is + # likely the least amount of callcounts, though looks clumsy + if self.positiontup: + positiontup_before = len(self.positiontup) + crud_params_struct = crud._get_crud_params( self, insert_stmt, compile_state, toplevel, **kw ) + + if self.positiontup: + positiontup_after = len(self.positiontup) + crud_params_single = crud_params_struct.single_params if ( @@ -4584,14 +4845,34 @@ class SQLCompiler(Compiled): ) if self.implicit_returning or insert_stmt._returning: + + # if returning clause is rendered first, capture bound parameters + # while visiting and place them prior to the VALUES oriented + # bound parameters, when using positional parameter scheme + rpv = self.returning_precedes_values + flip_pt = rpv and self.positional + if flip_pt: + pt: Optional[List[str]] = self.positiontup + temp_pt: Optional[List[str]] + self.positiontup = temp_pt = [] + else: + temp_pt = pt = None + returning_clause = self.returning_clause( insert_stmt, self.implicit_returning or insert_stmt._returning, populate_result_map=toplevel, ) - if self.returning_precedes_values: + if flip_pt: + if TYPE_CHECKING: + assert temp_pt is not None + assert pt is not None + self.positiontup = temp_pt + pt + + if rpv: text += " " + returning_clause + else: returning_clause = None @@ -4614,6 +4895,18 @@ class SQLCompiler(Compiled): text += " %s" % select_text elif not crud_params_single and supports_default_values: text += " DEFAULT VALUES" + if toplevel and self._insert_stmt_should_use_insertmanyvalues( + insert_stmt + ): + self._insertmanyvalues = _InsertManyValues( + True, + self.dialect.default_metavalue_token, + cast( + "List[Tuple[KeyedColumnElement[Any], str, str]]", + crud_params_single, + ), + (positiontup_after - positiontup_before), + ) elif compile_state._has_multi_parameters: text += " VALUES %s" % ( ", ".join( @@ -4623,6 +4916,8 @@ class SQLCompiler(Compiled): ) ) else: + # TODO: why is third element of crud_params_single not str + # already? insert_single_values_expr = ", ".join( [ value @@ -4631,9 +4926,20 @@ class SQLCompiler(Compiled): ) ] ) + text += " VALUES (%s)" % insert_single_values_expr - if toplevel: - self.insert_single_values_expr = insert_single_values_expr + if toplevel and self._insert_stmt_should_use_insertmanyvalues( + insert_stmt + ): + self._insertmanyvalues = _InsertManyValues( + False, + insert_single_values_expr, + cast( + "List[Tuple[KeyedColumnElement[Any], str, str]]", + crud_params_single, + ), + positiontup_after - positiontup_before, + ) if insert_stmt._post_values_clause is not None: post_values_clause = self.process( diff --git a/lib/sqlalchemy/sql/crud.py b/lib/sqlalchemy/sql/crud.py index 81151a26b..b13377a59 100644 --- a/lib/sqlalchemy/sql/crud.py +++ b/lib/sqlalchemy/sql/crud.py @@ -32,6 +32,7 @@ from . import coercions from . import dml from . import elements from . import roles +from .dml import isinsert as _compile_state_isinsert from .elements import ColumnClause from .schema import default_is_clause_element from .schema import default_is_sequence @@ -73,19 +74,18 @@ def _as_dml_column(c: ColumnElement[Any]) -> ColumnClause[Any]: return c -class _CrudParams(NamedTuple): - single_params: Sequence[ - Tuple[ColumnElement[Any], str, Optional[Union[str, _SQLExprDefault]]] - ] - all_multi_params: List[ - Sequence[ - Tuple[ - ColumnClause[Any], - str, - str, - ] - ] +_CrudParamSequence = Sequence[ + Tuple[ + "ColumnElement[Any]", + str, + Optional[Union[str, "_SQLExprDefault"]], ] +] + + +class _CrudParams(NamedTuple): + single_params: _CrudParamSequence + all_multi_params: List[Sequence[Tuple[ColumnClause[Any], str, str]]] def _get_crud_params( @@ -144,6 +144,12 @@ def _get_crud_params( compiler._get_bind_name_for_col = _col_bind_name + if stmt._returning and stmt._return_defaults: + raise exc.CompileError( + "Can't compile statement that includes returning() and " + "return_defaults() simultaneously" + ) + # no parameters in the statement, no parameters in the # compiled params - return binds for all columns if compiler.column_keys is None and compile_state._no_parameters: @@ -164,7 +170,10 @@ def _get_crud_params( ] spd: Optional[MutableMapping[_DMLColumnElement, Any]] - if compile_state._has_multi_parameters: + if ( + _compile_state_isinsert(compile_state) + and compile_state._has_multi_parameters + ): mp = compile_state._multi_parameters assert mp is not None spd = mp[0] @@ -227,7 +236,7 @@ def _get_crud_params( kw, ) - if compile_state.isinsert and stmt._select_names: + if _compile_state_isinsert(compile_state) and stmt._select_names: # is an insert from select, is not a multiparams assert not compile_state._has_multi_parameters @@ -272,7 +281,10 @@ def _get_crud_params( % (", ".join("%s" % (c,) for c in check)) ) - if compile_state._has_multi_parameters: + if ( + _compile_state_isinsert(compile_state) + and compile_state._has_multi_parameters + ): # is a multiparams, is not an insert from a select assert not stmt._select_names multi_extended_values = _extend_values_for_multiparams( @@ -297,7 +309,7 @@ def _get_crud_params( ( _as_dml_column(stmt.table.columns[0]), compiler.preparer.format_column(stmt.table.columns[0]), - "DEFAULT", + compiler.dialect.default_metavalue_token, ) ] @@ -500,7 +512,7 @@ def _scan_insert_from_select_cols( ins_from_select = ins_from_select._generate() # copy raw_columns ins_from_select._raw_columns = list(ins_from_select._raw_columns) + [ - expr for col, col_expr, expr in add_select_cols + expr for _, _, expr in add_select_cols ] compiler.stack[-1]["insert_from_select"] = ins_from_select @@ -539,7 +551,8 @@ def _scan_cols( else: cols = stmt.table.columns - if compile_state.isinsert and not compile_state._has_multi_parameters: + isinsert = _compile_state_isinsert(compile_state) + if isinsert and not compile_state._has_multi_parameters: # new rules for #7998. fetch lastrowid or implicit returning # for autoincrement column even if parameter is NULL, for DBs that # override NULL param for primary key (sqlite, mysql/mariadb) @@ -575,7 +588,7 @@ def _scan_cols( kw, ) - elif compile_state.isinsert: + elif isinsert: # no parameter is present and it's an insert. if c.primary_key and need_pks: @@ -683,7 +696,8 @@ def _append_param_parameter( value, required=value is REQUIRED, name=_col_bind_name(c) - if not compile_state._has_multi_parameters + if not _compile_state_isinsert(compile_state) + or not compile_state._has_multi_parameters else "%s_m0" % _col_bind_name(c), **kw, ) @@ -706,7 +720,8 @@ def _append_param_parameter( c, value, name=_col_bind_name(c) - if not compile_state._has_multi_parameters + if not _compile_state_isinsert(compile_state) + or not compile_state._has_multi_parameters else "%s_m0" % _col_bind_name(c), **kw, ) @@ -922,11 +937,19 @@ def _append_param_insert_select_hasdefault( not c.default.optional or not compiler.dialect.sequences_optional ): values.append( - (c, compiler.preparer.format_column(c), c.default.next_value()) + ( + c, + compiler.preparer.format_column(c), + c.default.next_value(), + ) ) elif default_is_clause_element(c.default): values.append( - (c, compiler.preparer.format_column(c), c.default.arg.self_group()) + ( + c, + compiler.preparer.format_column(c), + c.default.arg.self_group(), + ) ) else: values.append( @@ -1105,14 +1128,10 @@ def _process_multiparam_default_bind( return compiler.process(c.default, **kw) else: col = _multiparam_column(c, index) - if isinstance(stmt, dml.Insert): - return _create_insert_prefetch_bind_param( - compiler, col, process=True, **kw - ) - else: - return _create_update_prefetch_bind_param( - compiler, col, process=True, **kw - ) + assert isinstance(stmt, dml.Insert) + return _create_insert_prefetch_bind_param( + compiler, col, process=True, **kw + ) def _get_update_multitable_params( @@ -1205,13 +1224,7 @@ def _extend_values_for_multiparams( mp = compile_state._multi_parameters assert mp is not None for i, row in enumerate(mp[1:]): - extension: List[ - Tuple[ - ColumnClause[Any], - str, - str, - ] - ] = [] + extension: List[Tuple[ColumnClause[Any], str, str]] = [] row = {_column_as_key(key): v for key, v in row.items()} @@ -1292,7 +1305,7 @@ def _get_returning_modifiers(compiler, stmt, compile_state, toplevel): """ need_pks = ( toplevel - and compile_state.isinsert + and _compile_state_isinsert(compile_state) and not stmt._inline and ( not compiler.for_executemany @@ -1348,7 +1361,7 @@ def _get_returning_modifiers(compiler, stmt, compile_state, toplevel): if implicit_returning: postfetch_lastrowid = False - if compile_state.isinsert: + if _compile_state_isinsert(compile_state): implicit_return_defaults = implicit_returning and stmt._return_defaults elif compile_state.isupdate: implicit_return_defaults = ( diff --git a/lib/sqlalchemy/sql/dml.py b/lib/sqlalchemy/sql/dml.py index eb612f394..a08e38800 100644 --- a/lib/sqlalchemy/sql/dml.py +++ b/lib/sqlalchemy/sql/dml.py @@ -118,7 +118,6 @@ class DMLState(CompileState): ] = None _ordered_values: Optional[List[Tuple[_DMLColumnElement, Any]]] = None _parameter_ordering: Optional[List[_DMLColumnElement]] = None - _has_multi_parameters = False _primary_table: FromClause _supports_implicit_returning = True @@ -202,64 +201,10 @@ class DMLState(CompileState): froms.extend(all_tables[1:]) return primary_table, froms - def _process_multi_values(self, statement: ValuesBase) -> None: - if not statement._supports_multi_parameters: - raise exc.InvalidRequestError( - "%s construct does not support " - "multiple parameter sets." % statement.__visit_name__.upper() - ) - else: - assert isinstance(statement, Insert) - - # which implies... - # assert isinstance(statement.table, TableClause) - - for parameters in statement._multi_values: - multi_parameters: List[MutableMapping[_DMLColumnElement, Any]] = [ - { - c.key: value - for c, value in zip(statement.table.c, parameter_set) - } - if isinstance(parameter_set, collections_abc.Sequence) - else parameter_set - for parameter_set in parameters - ] - - if self._no_parameters: - self._no_parameters = False - self._has_multi_parameters = True - self._multi_parameters = multi_parameters - self._dict_parameters = self._multi_parameters[0] - elif not self._has_multi_parameters: - self._cant_mix_formats_error() - else: - assert self._multi_parameters - self._multi_parameters.extend(multi_parameters) - def _process_values(self, statement: ValuesBase) -> None: if self._no_parameters: - self._has_multi_parameters = False self._dict_parameters = statement._values self._no_parameters = False - elif self._has_multi_parameters: - self._cant_mix_formats_error() - - def _process_ordered_values(self, statement: ValuesBase) -> None: - parameters = statement._ordered_values - - if self._no_parameters: - self._no_parameters = False - assert parameters is not None - self._dict_parameters = dict(parameters) - self._ordered_values = parameters - self._parameter_ordering = [key for key, value in parameters] - elif self._has_multi_parameters: - self._cant_mix_formats_error() - else: - raise exc.InvalidRequestError( - "Can only invoke ordered_values() once, and not mixed " - "with any other values() call" - ) def _process_select_values(self, statement: ValuesBase) -> None: assert statement._select_names is not None @@ -276,6 +221,12 @@ class DMLState(CompileState): # does not allow this construction to occur assert False, "This statement already has parameters" + def _no_multi_values_supported(self, statement: ValuesBase) -> NoReturn: + raise exc.InvalidRequestError( + "%s construct does not support " + "multiple parameter sets." % statement.__visit_name__.upper() + ) + def _cant_mix_formats_error(self) -> NoReturn: raise exc.InvalidRequestError( "Can't mix single and multiple VALUES " @@ -291,6 +242,8 @@ class InsertDMLState(DMLState): include_table_with_column_exprs = False + _has_multi_parameters = False + def __init__( self, statement: Insert, @@ -320,6 +273,37 @@ class InsertDMLState(DMLState): for col in self._dict_parameters or () ] + def _process_values(self, statement: ValuesBase) -> None: + if self._no_parameters: + self._has_multi_parameters = False + self._dict_parameters = statement._values + self._no_parameters = False + elif self._has_multi_parameters: + self._cant_mix_formats_error() + + def _process_multi_values(self, statement: ValuesBase) -> None: + for parameters in statement._multi_values: + multi_parameters: List[MutableMapping[_DMLColumnElement, Any]] = [ + { + c.key: value + for c, value in zip(statement.table.c, parameter_set) + } + if isinstance(parameter_set, collections_abc.Sequence) + else parameter_set + for parameter_set in parameters + ] + + if self._no_parameters: + self._no_parameters = False + self._has_multi_parameters = True + self._multi_parameters = multi_parameters + self._dict_parameters = self._multi_parameters[0] + elif not self._has_multi_parameters: + self._cant_mix_formats_error() + else: + assert self._multi_parameters + self._multi_parameters.extend(multi_parameters) + @CompileState.plugin_for("default", "update") class UpdateDMLState(DMLState): @@ -336,7 +320,7 @@ class UpdateDMLState(DMLState): elif statement._values is not None: self._process_values(statement) elif statement._multi_values: - self._process_multi_values(statement) + self._no_multi_values_supported(statement) t, ef = self._make_extra_froms(statement) self._primary_table = t self._extra_froms = ef @@ -347,6 +331,21 @@ class UpdateDMLState(DMLState): mt and compiler.render_table_with_column_in_update_from ) + def _process_ordered_values(self, statement: ValuesBase) -> None: + parameters = statement._ordered_values + + if self._no_parameters: + self._no_parameters = False + assert parameters is not None + self._dict_parameters = dict(parameters) + self._ordered_values = parameters + self._parameter_ordering = [key for key, value in parameters] + else: + raise exc.InvalidRequestError( + "Can only invoke ordered_values() once, and not mixed " + "with any other values() call" + ) + @CompileState.plugin_for("default", "delete") class DeleteDMLState(DMLState): @@ -897,18 +896,68 @@ class ValuesBase(UpdateBase): return self @_generative - @_exclusive_against( - "_returning", - msgs={ - "_returning": "RETURNING is already configured on this statement" - }, - defaults={"_returning": _returning}, - ) def return_defaults( self: SelfValuesBase, *cols: _DMLColumnArgument ) -> SelfValuesBase: """Make use of a :term:`RETURNING` clause for the purpose - of fetching server-side expressions and defaults. + of fetching server-side expressions and defaults, for supporting + backends only. + + .. tip:: + + The :meth:`.ValuesBase.return_defaults` method is used by the ORM + for its internal work in fetching newly generated primary key + and server default values, in particular to provide the underyling + implementation of the :paramref:`_orm.Mapper.eager_defaults` + ORM feature. Its behavior is fairly idiosyncratic + and is not really intended for general use. End users should + stick with using :meth:`.UpdateBase.returning` in order to + add RETURNING clauses to their INSERT, UPDATE and DELETE + statements. + + Normally, a single row INSERT statement will automatically populate the + :attr:`.CursorResult.inserted_primary_key` attribute when executed, + which stores the primary key of the row that was just inserted in the + form of a :class:`.Row` object with column names as named tuple keys + (and the :attr:`.Row._mapping` view fully populated as well). The + dialect in use chooses the strategy to use in order to populate this + data; if it was generated using server-side defaults and / or SQL + expressions, dialect-specific approaches such as ``cursor.lastrowid`` + or ``RETURNING`` are typically used to acquire the new primary key + value. + + However, when the statement is modified by calling + :meth:`.ValuesBase.return_defaults` before executing the statement, + additional behaviors take place **only** for backends that support + RETURNING and for :class:`.Table` objects that maintain the + :paramref:`.Table.implicit_returning` parameter at its default value of + ``True``. In these cases, when the :class:`.CursorResult` is returned + from the statement's execution, not only will + :attr:`.CursorResult.inserted_primary_key` be populated as always, the + :attr:`.CursorResult.returned_defaults` attribute will also be + populated with a :class:`.Row` named-tuple representing the full range + of server generated + values from that single row, including values for any columns that + specify :paramref:`_schema.Column.server_default` or which make use of + :paramref:`_schema.Column.default` using a SQL expression. + + When invoking INSERT statements with multiple rows using + :ref:`insertmanyvalues <engine_insertmanyvalues>`, the + :meth:`.ValuesBase.return_defaults` modifier will have the effect of + the :attr:`_engine.CursorResult.inserted_primary_key_rows` and + :attr:`_engine.CursorResult.returned_defaults_rows` attributes being + fully populated with lists of :class:`.Row` objects representing newly + inserted primary key values as well as newly inserted server generated + values for each row inserted. The + :attr:`.CursorResult.inserted_primary_key` and + :attr:`.CursorResult.returned_defaults` attributes will also continue + to be populated with the first row of these two collections. + + If the backend does not support RETURNING or the :class:`.Table` in use + has disabled :paramref:`.Table.implicit_returning`, then no RETURNING + clause is added and no additional data is fetched, however the + INSERT or UPDATE statement proceeds normally. + E.g.:: @@ -918,64 +967,58 @@ class ValuesBase(UpdateBase): server_created_at = result.returned_defaults['created_at'] - When used against a backend that supports RETURNING, all column - values generated by SQL expression or server-side-default will be - added to any existing RETURNING clause, provided that - :meth:`.UpdateBase.returning` is not used simultaneously. The column - values will then be available on the result using the - :attr:`_engine.CursorResult.returned_defaults` accessor as - a dictionary, - referring to values keyed to the :class:`_schema.Column` - object as well as - its ``.key``. - - This method differs from :meth:`.UpdateBase.returning` in these ways: - - 1. :meth:`.ValuesBase.return_defaults` is only intended for use with an - INSERT or an UPDATE statement that matches exactly one row per - parameter set. While the RETURNING construct in the general sense - supports multiple rows for a multi-row UPDATE or DELETE statement, - or for special cases of INSERT that return multiple rows (e.g. - INSERT from SELECT, multi-valued VALUES clause), - :meth:`.ValuesBase.return_defaults` is intended only for an - "ORM-style" single-row INSERT/UPDATE statement. The row - returned by the statement is also consumed implicitly when - :meth:`.ValuesBase.return_defaults` is used. By contrast, - :meth:`.UpdateBase.returning` leaves the RETURNING result-set intact - with a collection of any number of rows. - - 2. It is compatible with the existing logic to fetch auto-generated - primary key values, also known as "implicit returning". Backends - that support RETURNING will automatically make use of RETURNING in - order to fetch the value of newly generated primary keys; while the - :meth:`.UpdateBase.returning` method circumvents this behavior, - :meth:`.ValuesBase.return_defaults` leaves it intact. - - 3. It can be called against any backend. Backends that don't support - RETURNING will skip the usage of the feature, rather than raising - an exception. The return value of - :attr:`_engine.CursorResult.returned_defaults` will be ``None`` + + The :meth:`.ValuesBase.return_defaults` method is mutually exclusive + against the :meth:`.UpdateBase.returning` method and errors will be + raised during the SQL compilation process if both are used at the same + time on one statement. The RETURNING clause of the INSERT or UPDATE + statement is therefore controlled by only one of these methods at a + time. + + The :meth:`.ValuesBase.return_defaults` method differs from + :meth:`.UpdateBase.returning` in these ways: + + 1. :meth:`.ValuesBase.return_defaults` method causes the + :attr:`.CursorResult.returned_defaults` collection to be populated + with the first row from the RETURNING result. This attribute is not + populated when using :meth:`.UpdateBase.returning`. + + 2. :meth:`.ValuesBase.return_defaults` is compatible with existing + logic used to fetch auto-generated primary key values that are then + populated into the :attr:`.CursorResult.inserted_primary_key` + attribute. By contrast, using :meth:`.UpdateBase.returning` will + have the effect of the :attr:`.CursorResult.inserted_primary_key` + attribute being left unpopulated. + + 3. :meth:`.ValuesBase.return_defaults` can be called against any + backend. Backends that don't support RETURNING will skip the usage + of the feature, rather than raising an exception. The return value + of :attr:`_engine.CursorResult.returned_defaults` will be ``None`` + for backends that don't support RETURNING or for which the target + :class:`.Table` sets :paramref:`.Table.implicit_returning` to + ``False``. 4. An INSERT statement invoked with executemany() is supported if the backend database driver supports the - ``insert_executemany_returning`` feature, currently this includes - PostgreSQL with psycopg2. When executemany is used, the + :ref:`insertmanyvalues <engine_insertmanyvalues>` + feature which is now supported by most SQLAlchemy-included backends. + When executemany is used, the :attr:`_engine.CursorResult.returned_defaults_rows` and :attr:`_engine.CursorResult.inserted_primary_key_rows` accessors will return the inserted defaults and primary keys. - .. versionadded:: 1.4 + .. versionadded:: 1.4 Added + :attr:`_engine.CursorResult.returned_defaults_rows` and + :attr:`_engine.CursorResult.inserted_primary_key_rows` accessors. + In version 2.0, the underlying implementation which fetches and + populates the data for these attributes was generalized to be + supported by most backends, whereas in 1.4 they were only + supported by the ``psycopg2`` driver. - :meth:`.ValuesBase.return_defaults` is used by the ORM to provide - an efficient implementation for the ``eager_defaults`` feature of - :class:`_orm.Mapper`. :param cols: optional list of column key names or - :class:`_schema.Column` - objects. If omitted, all column expressions evaluated on the server - are added to the returning list. - - .. versionadded:: 0.9.0 + :class:`_schema.Column` that acts as a filter for those columns that + will be fetched. .. seealso:: diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 3320214a2..565537109 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -647,8 +647,9 @@ class Table( server side defaults, on those backends which support RETURNING. In modern SQLAlchemy there is generally no reason to alter this - setting, except in the case of some backends such as SQL Server - when INSERT triggers are used for that table. + setting, except for some backend specific cases + (see :ref:`mssql_triggers` in the SQL Server dialect documentation + for one such example). :param include_columns: A list of strings indicating a subset of columns to be loaded via the ``autoload`` operation; table columns who @@ -3158,7 +3159,6 @@ class ScalarElementColumnDefault(ColumnDefault): ) -# _SQLExprDefault = Union["ColumnElement[Any]", "TextClause", "SelectBase"] _SQLExprDefault = Union["ColumnElement[Any]", "TextClause"] diff --git a/lib/sqlalchemy/sql/util.py b/lib/sqlalchemy/sql/util.py index 0400ab3fe..55c6a35f8 100644 --- a/lib/sqlalchemy/sql/util.py +++ b/lib/sqlalchemy/sql/util.py @@ -598,6 +598,21 @@ class _repr_row(_repr_base): ) +class _long_statement(str): + def __str__(self) -> str: + lself = len(self) + if lself > 500: + lleft = 250 + lright = 100 + trunc = lself - lleft - lright + return ( + f"{self[0:lleft]} ... {trunc} " + f"characters truncated ... {self[-lright:]}" + ) + else: + return str.__str__(self) + + class _repr_params(_repr_base): """Provide a string view of bound parameters. @@ -606,12 +621,13 @@ class _repr_params(_repr_base): """ - __slots__ = "params", "batches", "ismulti" + __slots__ = "params", "batches", "ismulti", "max_params" def __init__( self, params: Optional[_AnyExecuteParams], batches: int, + max_params: int = 100, max_chars: int = 300, ismulti: Optional[bool] = None, ): @@ -619,6 +635,7 @@ class _repr_params(_repr_base): self.ismulti = ismulti self.batches = batches self.max_chars = max_chars + self.max_params = max_params def __repr__(self) -> str: if self.ismulti is None: @@ -693,29 +710,110 @@ class _repr_params(_repr_base): else: return "(%s)" % elements + def _get_batches(self, params: Iterable[Any]) -> Any: + + lparams = list(params) + lenparams = len(lparams) + if lenparams > self.max_params: + lleft = self.max_params // 2 + return ( + lparams[0:lleft], + lparams[-lleft:], + lenparams - self.max_params, + ) + else: + return lparams, None, None + def _repr_params( self, params: _AnySingleExecuteParams, typ: int, ) -> str: - trunc = self.trunc if typ is self._DICT: - return "{%s}" % ( + return self._repr_param_dict( + cast("_CoreSingleExecuteParams", params) + ) + elif typ is self._TUPLE: + return self._repr_param_tuple(cast("Sequence[Any]", params)) + else: + return self._repr_param_list(params) + + def _repr_param_dict(self, params: _CoreSingleExecuteParams) -> str: + trunc = self.trunc + ( + items_first_batch, + items_second_batch, + trunclen, + ) = self._get_batches(params.items()) + + if items_second_batch: + text = "{%s" % ( ", ".join( - "%r: %s" % (key, trunc(value)) - for key, value in cast( - "_CoreSingleExecuteParams", params - ).items() + f"{key!r}: {trunc(value)}" + for key, value in items_first_batch ) ) - elif typ is self._TUPLE: - seq_params = cast("Sequence[Any]", params) - return "(%s%s)" % ( - ", ".join(trunc(value) for value in seq_params), - "," if len(seq_params) == 1 else "", + text += f" ... {trunclen} parameters truncated ... " + text += "%s}" % ( + ", ".join( + f"{key!r}: {trunc(value)}" + for key, value in items_second_batch + ) ) else: - return "[%s]" % (", ".join(trunc(value) for value in params)) + text = "{%s}" % ( + ", ".join( + f"{key!r}: {trunc(value)}" + for key, value in items_first_batch + ) + ) + return text + + def _repr_param_tuple(self, params: "Sequence[Any]") -> str: + trunc = self.trunc + + ( + items_first_batch, + items_second_batch, + trunclen, + ) = self._get_batches(params) + + if items_second_batch: + text = "(%s" % ( + ", ".join(trunc(value) for value in items_first_batch) + ) + text += f" ... {trunclen} parameters truncated ... " + text += "%s)" % ( + ", ".join(trunc(value) for value in items_second_batch), + ) + else: + text = "(%s%s)" % ( + ", ".join(trunc(value) for value in items_first_batch), + "," if len(items_first_batch) == 1 else "", + ) + return text + + def _repr_param_list(self, params: _AnySingleExecuteParams) -> str: + trunc = self.trunc + ( + items_first_batch, + items_second_batch, + trunclen, + ) = self._get_batches(params) + + if items_second_batch: + text = "[%s" % ( + ", ".join(trunc(value) for value in items_first_batch) + ) + text += f" ... {trunclen} parameters truncated ... " + text += "%s]" % ( + ", ".join(trunc(value) for value in items_second_batch) + ) + else: + text = "[%s]" % ( + ", ".join(trunc(value) for value in items_first_batch) + ) + return text def adapt_criterion_to_null(crit: _CE, nulls: Collection[Any]) -> _CE: diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index 7ba89b505..a8650f222 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -459,3 +459,19 @@ def set_default_schema_on_connection(cfg, dbapi_connection, schema_name): "backend does not implement a schema name set function: %s" % (cfg.db.url,) ) + + +@register.init +def upsert(cfg, table, returning, set_lambda=None): + """return the backends insert..on conflict / on dupe etc. construct. + + while we should add a backend-neutral upsert construct as well, such as + insert().upsert(), it's important that we continue to test the + backend-specific insert() constructs since if we do implement + insert().upsert(), that would be using a different codepath for the things + we need to test like insertmanyvalues, etc. + + """ + raise NotImplementedError( + f"backend does not include an upsert implementation: {cfg.db.url}" + ) diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index 874383394..3a0fc818d 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -424,6 +424,15 @@ class SuiteRequirements(Requirements): ) @property + def insertmanyvalues(self): + return exclusions.only_if( + lambda config: config.db.dialect.supports_multivalues_insert + and config.db.dialect.insert_returning + and config.db.dialect.use_insertmanyvalues, + "%(database)s %(does_support)s 'insertmanyvalues functionality", + ) + + @property def tuple_in(self): """Target platform supports the syntax "(x, y) IN ((x1, y1), (x2, y2), ...)" diff --git a/lib/sqlalchemy/testing/suite/test_dialect.py b/lib/sqlalchemy/testing/suite/test_dialect.py index bb2dd6574..efad81930 100644 --- a/lib/sqlalchemy/testing/suite/test_dialect.py +++ b/lib/sqlalchemy/testing/suite/test_dialect.py @@ -11,6 +11,7 @@ from .. import fixtures from .. import is_true from .. import ne_ from .. import provide_metadata +from ..assertions import expect_raises from ..assertions import expect_raises_message from ..config import requirements from ..provision import set_default_schema_on_connection @@ -412,3 +413,156 @@ class DifficultParametersTest(fixtures.TestBase): # name works as the key from cursor.description eq_(row._mapping[name], "some name") + + +class ReturningGuardsTest(fixtures.TablesTest): + """test that the various 'returning' flags are set appropriately""" + + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + + Table( + "t", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("data", String(50)), + ) + + @testing.fixture + def run_stmt(self, connection): + t = self.tables.t + + def go(stmt, executemany, id_param_name, expect_success): + stmt = stmt.returning(t.c.id) + + if executemany: + if not expect_success: + # for RETURNING executemany(), we raise our own + # error as this is independent of general RETURNING + # support + with expect_raises_message( + exc.StatementError, + rf"Dialect {connection.dialect.name}\+" + f"{connection.dialect.driver} with " + f"current server capabilities does not support " + f".*RETURNING when executemany is used", + ): + result = connection.execute( + stmt, + [ + {id_param_name: 1, "data": "d1"}, + {id_param_name: 2, "data": "d2"}, + {id_param_name: 3, "data": "d3"}, + ], + ) + else: + result = connection.execute( + stmt, + [ + {id_param_name: 1, "data": "d1"}, + {id_param_name: 2, "data": "d2"}, + {id_param_name: 3, "data": "d3"}, + ], + ) + eq_(result.all(), [(1,), (2,), (3,)]) + else: + if not expect_success: + # for RETURNING execute(), we pass all the way to the DB + # and let it fail + with expect_raises(exc.DBAPIError): + connection.execute( + stmt, {id_param_name: 1, "data": "d1"} + ) + else: + result = connection.execute( + stmt, {id_param_name: 1, "data": "d1"} + ) + eq_(result.all(), [(1,)]) + + return go + + def test_insert_single(self, connection, run_stmt): + t = self.tables.t + + stmt = t.insert() + + run_stmt(stmt, False, "id", connection.dialect.insert_returning) + + def test_insert_many(self, connection, run_stmt): + t = self.tables.t + + stmt = t.insert() + + run_stmt( + stmt, True, "id", connection.dialect.insert_executemany_returning + ) + + def test_update_single(self, connection, run_stmt): + t = self.tables.t + + connection.execute( + t.insert(), + [ + {"id": 1, "data": "d1"}, + {"id": 2, "data": "d2"}, + {"id": 3, "data": "d3"}, + ], + ) + + stmt = t.update().where(t.c.id == bindparam("b_id")) + + run_stmt(stmt, False, "b_id", connection.dialect.update_returning) + + def test_update_many(self, connection, run_stmt): + t = self.tables.t + + connection.execute( + t.insert(), + [ + {"id": 1, "data": "d1"}, + {"id": 2, "data": "d2"}, + {"id": 3, "data": "d3"}, + ], + ) + + stmt = t.update().where(t.c.id == bindparam("b_id")) + + run_stmt( + stmt, True, "b_id", connection.dialect.update_executemany_returning + ) + + def test_delete_single(self, connection, run_stmt): + t = self.tables.t + + connection.execute( + t.insert(), + [ + {"id": 1, "data": "d1"}, + {"id": 2, "data": "d2"}, + {"id": 3, "data": "d3"}, + ], + ) + + stmt = t.delete().where(t.c.id == bindparam("b_id")) + + run_stmt(stmt, False, "b_id", connection.dialect.delete_returning) + + def test_delete_many(self, connection, run_stmt): + t = self.tables.t + + connection.execute( + t.insert(), + [ + {"id": 1, "data": "d1"}, + {"id": 2, "data": "d2"}, + {"id": 3, "data": "d3"}, + ], + ) + + stmt = t.delete().where(t.c.id == bindparam("b_id")) + + run_stmt( + stmt, True, "b_id", connection.dialect.delete_executemany_returning + ) diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py index 2307d3b3f..ae54f6bcd 100644 --- a/lib/sqlalchemy/testing/suite/test_insert.py +++ b/lib/sqlalchemy/testing/suite/test_insert.py @@ -338,6 +338,7 @@ class ReturningTest(fixtures.TablesTest): r = connection.execute( table.insert().returning(table.c.id), dict(data="some data") ) + pk = r.first()[0] fetched_pk = connection.scalar(select(table.c.id)) eq_(fetched_pk, pk) @@ -357,5 +358,25 @@ class ReturningTest(fixtures.TablesTest): pk = connection.scalar(select(self.tables.autoinc_pk.c.id)) eq_(r.inserted_primary_key, (pk,)) + @requirements.insert_executemany_returning + def test_insertmanyvalues_returning(self, connection): + r = connection.execute( + self.tables.autoinc_pk.insert().returning( + self.tables.autoinc_pk.c.id + ), + [ + {"data": "d1"}, + {"data": "d2"}, + {"data": "d3"}, + {"data": "d4"}, + {"data": "d5"}, + ], + ) + rall = r.all() + + pks = connection.execute(select(self.tables.autoinc_pk.c.id)) + + eq_(rall, pks.all()) + __all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest") diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index 59e9cc7f4..7d79c67ae 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -164,6 +164,26 @@ class PercentSchemaNamesTest(fixtures.TablesTest): ) self._assert_table(connection) + @requirements.insert_executemany_returning + def test_executemany_returning_roundtrip(self, connection): + percent_table = self.tables.percent_table + connection.execute( + percent_table.insert(), {"percent%": 5, "spaces % more spaces": 12} + ) + result = connection.execute( + percent_table.insert().returning( + percent_table.c["percent%"], + percent_table.c["spaces % more spaces"], + ), + [ + {"percent%": 7, "spaces % more spaces": 11}, + {"percent%": 9, "spaces % more spaces": 10}, + {"percent%": 11, "spaces % more spaces": 9}, + ], + ) + eq_(result.all(), [(7, 11), (9, 10), (11, 9)]) + self._assert_table(connection) + def _assert_table(self, conn): percent_table = self.tables.percent_table lightweight_percent_table = self.tables.lightweight_percent_table |
