summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-18 15:08:37 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:15:32 -0400
commit2bcc97da424eef7db9a5d02f81d02344925415ee (patch)
tree13d4f04bc7dd40a0207f86aa2fc3a3b49e065674 /lib/sqlalchemy
parent332188e5680574368001ded52eb0a9d259ecdef5 (diff)
downloadsqlalchemy-2bcc97da424eef7db9a5d02f81d02344925415ee.tar.gz
implement batched INSERT..VALUES () () for executemany
the feature is enabled for all built in backends when RETURNING is used, except for Oracle that doesn't need it, and on psycopg2 and mssql+pyodbc it is used for all INSERT statements, not just those that use RETURNING. third party dialects would need to opt in to the new feature by setting use_insertmanyvalues to True. Also adds dialect-level guards against using returning with executemany where we dont have an implementation to suit it. execute single w/ returning still defers to the server without us checking. Fixes: #6047 Fixes: #7907 Change-Id: I3936d3c00003f02e322f2e43fb949d0e6e568304
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py50
-rw-r--r--lib/sqlalchemy/dialects/mssql/pyodbc.py46
-rw-r--r--lib/sqlalchemy/dialects/mysql/base.py2
-rw-r--r--lib/sqlalchemy/dialects/mysql/provision.py17
-rw-r--r--lib/sqlalchemy/dialects/oracle/cx_oracle.py2
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py2
-rw-r--r--lib/sqlalchemy/dialects/postgresql/base.py21
-rw-r--r--lib/sqlalchemy/dialects/postgresql/provision.py18
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py201
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py12
-rw-r--r--lib/sqlalchemy/dialects/sqlite/provision.py16
-rw-r--r--lib/sqlalchemy/engine/base.py217
-rw-r--r--lib/sqlalchemy/engine/create.py28
-rw-r--r--lib/sqlalchemy/engine/default.py160
-rw-r--r--lib/sqlalchemy/engine/interfaces.py218
-rw-r--r--lib/sqlalchemy/orm/context.py4
-rw-r--r--lib/sqlalchemy/orm/persistence.py2
-rw-r--r--lib/sqlalchemy/orm/session.py55
-rw-r--r--lib/sqlalchemy/sql/compiler.py350
-rw-r--r--lib/sqlalchemy/sql/crud.py93
-rw-r--r--lib/sqlalchemy/sql/dml.py267
-rw-r--r--lib/sqlalchemy/sql/schema.py6
-rw-r--r--lib/sqlalchemy/sql/util.py124
-rw-r--r--lib/sqlalchemy/testing/provision.py16
-rw-r--r--lib/sqlalchemy/testing/requirements.py9
-rw-r--r--lib/sqlalchemy/testing/suite/test_dialect.py154
-rw-r--r--lib/sqlalchemy/testing/suite/test_insert.py21
-rw-r--r--lib/sqlalchemy/testing/suite/test_results.py20
28 files changed, 1658 insertions, 473 deletions
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index c7e88a643..085a2c27b 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -250,6 +250,19 @@ The process for fetching this value has several variants:
INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
+ As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also
+ used by default to optimize many-row INSERT statements; for SQL Server
+ the feature takes place for both RETURNING and-non RETURNING
+ INSERT statements.
+
+* The value of :paramref:`_sa.create_engine.insertmanyvalues_page_size`
+ defaults to 1000, however the ultimate page size for a particular INSERT
+ statement may be limited further, based on an observed limit of
+ 2100 bound parameters for a single statement in SQL Server.
+ The page size may also be modified on a per-engine
+ or per-statement basis; see the section
+ :ref:`engine_insertmanyvalues_page_size` for details.
+
* When RETURNING is not available or has been disabled via
``implicit_returning=False``, either the ``scope_identity()`` function or
the ``@@identity`` variable is used; behavior varies by backend:
@@ -258,9 +271,13 @@ The process for fetching this value has several variants:
appended to the end of the INSERT statement; a second result set will be
fetched in order to receive the value. Given a table as::
- t = Table('t', m, Column('id', Integer, primary_key=True),
- Column('x', Integer),
- implicit_returning=False)
+ t = Table(
+ 't',
+ metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer),
+ implicit_returning=False
+ )
an INSERT will look like:
@@ -731,6 +748,8 @@ compatibility level information. Because of this, if running under
a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
statements that are unable to be parsed by the database server.
+.. _mssql_triggers:
+
Triggers
--------
@@ -755,9 +774,6 @@ Declarative form::
__table_args__ = {'implicit_returning':False}
-This option can also be specified engine-wide using the
-``implicit_returning=False`` argument on :func:`_sa.create_engine`.
-
.. _mssql_rowcount_versioning:
Rowcount Support / ORM Versioning
@@ -2846,6 +2862,12 @@ class MSDialect(default.DefaultDialect):
supports_empty_insert = False
supports_comments = True
+ supports_default_metavalue = False
+ """dialect supports INSERT... VALUES (DEFAULT) syntax -
+ SQL Server **does** support this, but **not** for the IDENTITY column,
+ so we can't turn this on.
+
+ """
# supports_native_uuid is partial here, so we implement our
# own impl type
@@ -2892,6 +2914,19 @@ class MSDialect(default.DefaultDialect):
non_native_boolean_check_constraint = False
supports_unicode_binds = True
postfetch_lastrowid = True
+
+ # may be changed at server inspection time for older SQL server versions
+ supports_multivalues_insert = True
+
+ use_insertmanyvalues = True
+
+ use_insertmanyvalues_wo_returning = True
+
+ # "The incoming request has too many parameters. The server supports a "
+ # "maximum of 2100 parameters."
+ # in fact you can have 2099 parameters.
+ insertmanyvalues_max_parameters = 2099
+
_supports_offset_fetch = False
_supports_nvarchar_max = False
@@ -3054,6 +3089,9 @@ class MSDialect(default.DefaultDialect):
if self.server_version_info >= MS_2008_VERSION:
self.supports_multivalues_insert = True
+ else:
+ self.supports_multivalues_insert = False
+
if self.deprecate_large_types is None:
self.deprecate_large_types = (
self.server_version_info >= MS_2012_VERSION
diff --git a/lib/sqlalchemy/dialects/mssql/pyodbc.py b/lib/sqlalchemy/dialects/mssql/pyodbc.py
index 22e385865..2eef971cc 100644
--- a/lib/sqlalchemy/dialects/mssql/pyodbc.py
+++ b/lib/sqlalchemy/dialects/mssql/pyodbc.py
@@ -289,23 +289,34 @@ versioning.
Fast Executemany Mode
---------------------
-The Pyodbc driver has added support for a "fast executemany" mode of execution
+.. note:: SQLAlchemy 2.0 now includes an equivalent "fast executemany"
+ handler for INSERT statements that is more robust than the PyODBC feature;
+ the feature is called :ref:`insertmanyvalues <engine_insertmanyvalues>`
+ and is enabled by default for all INSERT statements used by SQL Server.
+ SQLAlchemy's feature integrates with the PyODBC ``setinputsizes()`` method
+ which allows for more accurate specification of datatypes, and additionally
+ uses a dynamically sized, batched approach that scales to any number of
+ columns and/or rows.
+
+ The SQL Server ``fast_executemany`` parameter may be used at the same time
+ as ``insertmanyvalues`` is enabled; however, the parameter will not be used
+ in as many cases as INSERT statements that are invoked using Core
+ :class:`.Insert` constructs as well as all ORM use no longer use the
+ ``.executemany()`` DBAPI cursor method.
+
+The PyODBC driver includes support for a "fast executemany" mode of execution
which greatly reduces round trips for a DBAPI ``executemany()`` call when using
Microsoft ODBC drivers, for **limited size batches that fit in memory**. The
-feature is enabled by setting the flag ``.fast_executemany`` on the DBAPI
-cursor when an executemany call is to be used. The SQLAlchemy pyodbc SQL
-Server dialect supports setting this flag automatically when the
-``.fast_executemany`` flag is passed to
-:func:`_sa.create_engine` ; note that the ODBC driver must be the Microsoft
-driver in order to use this flag::
+feature is enabled by setting the attribute ``.fast_executemany`` on the DBAPI
+cursor when an executemany call is to be used. The SQLAlchemy PyODBC SQL
+Server dialect supports this parameter by passing the
+``fast_executemany`` parameter to
+:func:`_sa.create_engine` , when using the **Microsoft ODBC driver only**::
engine = create_engine(
- "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server",
+ "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
fast_executemany=True)
-.. warning:: The pyodbc fast_executemany mode **buffers all rows in memory** and is
- not compatible with very large batches of data. A future version of SQLAlchemy
- may support this flag as a per-execution option instead.
.. versionadded:: 1.3
@@ -319,11 +330,13 @@ driver in order to use this flag::
Setinputsizes Support
-----------------------
-As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used by
-default except for .executemany() calls when fast_executemany=True.
+As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used for
+all statement executions, except for ``cursor.executemany()`` calls when
+fast_executemany=True where it is not supported (assuming
+:ref:`insertmanyvalues <engine_insertmanyvalues>` is kept enabled,
+"fastexecutemany" will not take place for INSERT statements in any case).
-The behavior of setinputsizes can be customized, as may be necessary
-particularly if fast_executemany is in use, via the
+The behavior of setinputsizes can be customized via the
:meth:`.DialectEvents.do_setinputsizes` hook. See that method for usage
examples.
@@ -331,7 +344,8 @@ examples.
unless ``use_setinputsizes=True`` is passed.
.. versionchanged:: 2.0 The mssql+pyodbc dialect now defaults to using
- setinputsizes except for .executemany() calls when fast_executemany=True.
+ setinputsizes for all statement executions with the exception of
+ cursor.executemany() calls when fast_executemany=True.
""" # noqa
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py
index c0521f61e..a3e99514b 100644
--- a/lib/sqlalchemy/dialects/mysql/base.py
+++ b/lib/sqlalchemy/dialects/mysql/base.py
@@ -2402,6 +2402,8 @@ class MySQLDialect(default.DefaultDialect):
supports_default_values = False
supports_default_metavalue = True
+ use_insertmanyvalues: bool = True
+
supports_sane_rowcount = True
supports_sane_multi_rowcount = False
supports_multivalues_insert = True
diff --git a/lib/sqlalchemy/dialects/mysql/provision.py b/lib/sqlalchemy/dialects/mysql/provision.py
index c73875fec..36a5e9f54 100644
--- a/lib/sqlalchemy/dialects/mysql/provision.py
+++ b/lib/sqlalchemy/dialects/mysql/provision.py
@@ -6,6 +6,7 @@ from ...testing.provision import create_db
from ...testing.provision import drop_db
from ...testing.provision import generate_driver_url
from ...testing.provision import temp_table_keyword_args
+from ...testing.provision import upsert
@generate_driver_url.for_db("mysql", "mariadb")
@@ -78,3 +79,19 @@ def _mysql_drop_db(cfg, eng, ident):
@temp_table_keyword_args.for_db("mysql", "mariadb")
def _mysql_temp_table_keyword_args(cfg, eng):
return {"prefixes": ["TEMPORARY"]}
+
+
+@upsert.for_db("mariadb")
+def _upsert(cfg, table, returning, set_lambda=None):
+ from sqlalchemy.dialects.mysql import insert
+
+ stmt = insert(table)
+
+ if set_lambda:
+ stmt = stmt.on_duplicate_key_update(**set_lambda(stmt.inserted))
+ else:
+ pk1 = table.primary_key.c[0]
+ stmt = stmt.on_duplicate_key_update({pk1.key: pk1})
+
+ stmt = stmt.returning(*returning)
+ return stmt
diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
index d9fb5c827..4571f51f7 100644
--- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py
+++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
@@ -897,6 +897,8 @@ class OracleDialect_cx_oracle(OracleDialect):
supports_sane_multi_rowcount = True
insert_executemany_returning = True
+ update_executemany_returning = True
+ delete_executemany_returning = True
bind_typing = interfaces.BindTyping.SETINPUTSIZES
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index c953d3447..96bac59d9 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -418,7 +418,6 @@ class AsyncAdapt_asyncpg_cursor:
"description",
"arraysize",
"rowcount",
- "_inputsizes",
"_cursor",
"_invalidate_schema_cache_asof",
)
@@ -433,7 +432,6 @@ class AsyncAdapt_asyncpg_cursor:
self.description = None
self.arraysize = 1
self.rowcount = -1
- self._inputsizes = None
self._invalidate_schema_cache_asof = 0
def close(self):
diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py
index 18a7c0a86..3e43d601f 100644
--- a/lib/sqlalchemy/dialects/postgresql/base.py
+++ b/lib/sqlalchemy/dialects/postgresql/base.py
@@ -1330,7 +1330,6 @@ from typing import List
from typing import Optional
from . import array as _array
-from . import dml
from . import hstore as _hstore
from . import json as _json
from . import pg_catalog
@@ -1850,24 +1849,6 @@ class PGCompiler(compiler.SQLCompiler):
return target_text
- @util.memoized_property
- def _is_safe_for_fast_insert_values_helper(self):
- # don't allow fast executemany if _post_values_clause is
- # present and is not an OnConflictDoNothing. what this means
- # concretely is that the
- # "fast insert executemany helper" won't be used, in other
- # words we won't convert "executemany()" of many parameter
- # sets into a single INSERT with many elements in VALUES.
- # We can't apply that optimization safely if for example the
- # statement includes a clause like "ON CONFLICT DO UPDATE"
-
- return self.insert_single_values_expr is not None and (
- self.statement._post_values_clause is None
- or isinstance(
- self.statement._post_values_clause, dml.OnConflictDoNothing
- )
- )
-
def visit_on_conflict_do_nothing(self, on_conflict, **kw):
target_text = self._on_conflict_target(on_conflict, **kw)
@@ -2804,6 +2785,7 @@ class PGDialect(default.DefaultDialect):
sequences_optional = True
preexecute_autoincrement_sequences = True
postfetch_lastrowid = False
+ use_insertmanyvalues = True
supports_comments = True
supports_constraint_comments = True
@@ -2813,6 +2795,7 @@ class PGDialect(default.DefaultDialect):
supports_empty_insert = False
supports_multivalues_insert = True
+
supports_identity_columns = True
default_paramstyle = "pyformat"
diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py
index 0d17f28e0..8dd8a4995 100644
--- a/lib/sqlalchemy/dialects/postgresql/provision.py
+++ b/lib/sqlalchemy/dialects/postgresql/provision.py
@@ -14,6 +14,7 @@ from ...testing.provision import log
from ...testing.provision import prepare_for_drop_tables
from ...testing.provision import set_default_schema_on_connection
from ...testing.provision import temp_table_keyword_args
+from ...testing.provision import upsert
@create_db.for_db("postgresql")
@@ -125,3 +126,20 @@ def prepare_for_drop_tables(config, connection):
"idle in transaction: %s"
% ("; ".join(row._mapping["query"] for row in rows))
)
+
+
+@upsert.for_db("postgresql")
+def _upsert(cfg, table, returning, set_lambda=None):
+ from sqlalchemy.dialects.postgresql import insert
+
+ stmt = insert(table)
+
+ if set_lambda:
+ stmt = stmt.on_conflict_do_update(
+ index_elements=table.primary_key, set_=set_lambda(stmt.excluded)
+ )
+ else:
+ stmt = stmt.on_conflict_do_nothing()
+
+ stmt = stmt.returning(*returning)
+ return stmt
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index a01f20e99..350f4b616 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -230,25 +230,23 @@ Modern versions of psycopg2 include a feature known as
`Fast Execution Helpers \
<https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
have been shown in benchmarking to improve psycopg2's executemany()
-performance, primarily with INSERT statements, by multiple orders of magnitude.
-SQLAlchemy internally makes use of these extensions for ``executemany()`` style
-calls, which correspond to lists of parameters being passed to
-:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
-sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever
-possible.
-
-The two available extensions on the psycopg2 side are the ``execute_values()``
-and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
-``execute_values()`` extension for all qualifying INSERT statements.
-
-.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
- ``"values_only"`` for ``executemany_mode``, which allows an order of
- magnitude performance improvement for INSERT statements, but does not
- include "batch" mode for UPDATE and DELETE statements which removes the
- ability of ``cursor.rowcount`` to function correctly.
-
-The use of these extensions is controlled by the ``executemany_mode`` flag
-which may be passed to :func:`_sa.create_engine`::
+performance, primarily with INSERT statements, by at least
+an order of magnitude.
+
+SQLAlchemy implements a native form of the "insert many values"
+handler that will rewrite a single-row INSERT statement to accommodate for
+many values at once within an extended VALUES clause; this handler is
+equivalent to psycopg2's ``execute_values()`` handler; an overview of this
+feature and its configuration are at :ref:`engine_insertmanyvalues`.
+
+.. versionadded:: 2.0 Replaced psycopg2's ``execute_values()`` fast execution
+ helper with a native SQLAlchemy mechanism referred towards as
+ :ref:`insertmanyvalues <engine_insertmanyvalues>`.
+
+The psycopg2 dialect retains the ability to use the psycopg2-specific
+``execute_batch()`` feature, although it is not expected that this is a widely
+used feature. The use of this extension may be enabled using the
+``executemany_mode`` flag which may be passed to :func:`_sa.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
@@ -257,59 +255,55 @@ which may be passed to :func:`_sa.create_engine`::
Possible options for ``executemany_mode`` include:
-* ``values_only`` - this is the default value. the psycopg2 execute_values()
- extension is used for qualifying INSERT statements, which rewrites the INSERT
- to include multiple VALUES clauses so that many parameter sets can be
- inserted with one statement.
-
- .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
- which is also now the default.
-
-* ``None`` - No psycopg2 extensions are not used, and the usual
- ``cursor.executemany()`` method is used when invoking statements with
- multiple parameter sets.
-
-* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
- INSERT, UPDATE and DELETE statements, so that multiple copies
- of a SQL query, each one corresponding to a parameter set passed to
- ``executemany()``, are joined into a single SQL string separated by a
- semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
- attribute will not contain a value for executemany-style executions.
-
-* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
- statements, ``execute_batch`` is used for UPDATE and DELETE.
- When using this mode, the :attr:`_engine.CursorResult.rowcount`
+* ``values_only`` - this is the default value. SQLAlchemy's native
+ :ref:`insertmanyvalues <engine_insertmanyvalues>` handler is used for qualifying
+ INSERT statements, assuming
+ :paramref:`_sa.create_engine.use_insertmanyvalues` is left at
+ its default value of ``True``. This handler rewrites simple
+ INSERT statements to include multiple VALUES clauses so that many
+ parameter sets can be inserted with one statement.
+
+* ``'values_plus_batch'``- SQLAlchemy's native
+ :ref:`insertmanyvalues <engine_insertmanyvalues>` handler is used for qualifying
+ INSERT statements, assuming
+ :paramref:`_sa.create_engine.use_insertmanyvalues` is left at its default
+ value of ``True``. Then, psycopg2's ``execute_batch()`` handler is used for
+ qualifying UPDATE and DELETE statements when executed with multiple parameter
+ sets. When using this mode, the :attr:`_engine.CursorResult.rowcount`
attribute will not contain a value for executemany-style executions against
UPDATE and DELETE statements.
-By "qualifying statements", we mean that the statement being executed
-must be a Core :func:`_expression.insert`, :func:`_expression.update`
-or :func:`_expression.delete` construct, and not a plain textual SQL
-string or one constructed using :func:`_expression.text`. When using the
-ORM, all insert/update/delete statements used by the ORM flush process
+.. versionchanged:: 2.0 Removed the ``'batch'`` and ``'None'`` options
+ from psycopg2 ``executemany_mode``. Control over batching for INSERT
+ statements is now configured via the
+ :paramref:`_sa.create_engine.use_insertmanyvalues` engine-level parameter.
+
+The term "qualifying statements" refers to the statement being executed
+being a Core :func:`_expression.insert`, :func:`_expression.update`
+or :func:`_expression.delete` construct, and **not** a plain textual SQL
+string or one constructed using :func:`_expression.text`. It also may **not** be
+a special "extension" statement such as an "ON CONFLICT" "upsert" statement.
+When using the ORM, all insert/update/delete statements used by the ORM flush process
are qualifying.
-The "page size" for the "values" and "batch" strategies can be affected
-by using the ``executemany_batch_page_size`` and
-``executemany_values_page_size`` engine parameters. These
-control how many parameter sets
-should be represented in each execution. The "values" page size defaults
-to 1000, which is different that psycopg2's default. The "batch" page
-size defaults to 100. These can be affected by passing new values to
-:func:`_engine.create_engine`::
+The "page size" for the psycopg2 "batch" strategy can be affected
+by using the ``executemany_batch_page_size`` parameter, which defaults to
+100.
+
+For the "insertmanyvalues" feature, the page size can be controlled using the
+:paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter,
+which defaults to 1000. An example of modifying both parameters
+is below::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
- executemany_mode='values',
- executemany_values_page_size=10000, executemany_batch_page_size=500)
-
-.. versionchanged:: 1.4
-
- The default for ``executemany_values_page_size`` is now 1000, up from
- 100.
+ executemany_mode='values_plus_batch',
+ insertmanyvalues_page_size=5000, executemany_batch_page_size=500)
.. seealso::
+ :ref:`engine_insertmanyvalues` - background on "insertmanyvalues"
+
:ref:`tutorial_multiple_parameters` - General information on using the
:class:`_engine.Connection`
object to execute statements in such a way as to make
@@ -484,13 +478,11 @@ from typing import cast
from . import ranges
from ._psycopg_common import _PGDialect_common_psycopg
from ._psycopg_common import _PGExecutionContext_common_psycopg
-from .base import PGCompiler
from .base import PGIdentifierPreparer
from .json import JSON
from .json import JSONB
from ... import types as sqltypes
from ... import util
-from ...engine import cursor as _cursor
from ...util import FastIntFlag
from ...util import parse_user_argument_for_enum
@@ -561,22 +553,6 @@ class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg):
_psycopg2_fetched_rows = None
def post_exec(self):
- if (
- self._psycopg2_fetched_rows
- and self.compiled
- and self.compiled.effective_returning
- ):
- # psycopg2 execute_values will provide for a real cursor where
- # cursor.description works correctly. however, it executes the
- # INSERT statement multiple times for multiple pages of rows, so
- # while this cursor also supports calling .fetchall() directly, in
- # order to get the list of all rows inserted across multiple pages,
- # we have to retrieve the aggregated list from the execute_values()
- # function directly.
- strat_cls = _cursor.FullyBufferedCursorFetchStrategy
- self.cursor_fetch_strategy = strat_cls(
- self.cursor, initial_buffer=self._psycopg2_fetched_rows
- )
self._log_notices(self.cursor)
def _log_notices(self, cursor):
@@ -597,24 +573,16 @@ class PGExecutionContext_psycopg2(_PGExecutionContext_common_psycopg):
cursor.connection.notices[:] = []
-class PGCompiler_psycopg2(PGCompiler):
- pass
-
-
class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
pass
class ExecutemanyMode(FastIntFlag):
- EXECUTEMANY_PLAIN = 0
- EXECUTEMANY_BATCH = 1
- EXECUTEMANY_VALUES = 2
- EXECUTEMANY_VALUES_PLUS_BATCH = EXECUTEMANY_BATCH | EXECUTEMANY_VALUES
+ EXECUTEMANY_VALUES = 0
+ EXECUTEMANY_VALUES_PLUS_BATCH = 1
(
- EXECUTEMANY_PLAIN,
- EXECUTEMANY_BATCH,
EXECUTEMANY_VALUES,
EXECUTEMANY_VALUES_PLUS_BATCH,
) = tuple(ExecutemanyMode)
@@ -630,9 +598,9 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
# set to true based on psycopg2 version
supports_sane_multi_rowcount = False
execution_ctx_cls = PGExecutionContext_psycopg2
- statement_compiler = PGCompiler_psycopg2
preparer = PGIdentifierPreparer_psycopg2
psycopg2_version = (0, 0)
+ use_insertmanyvalues_wo_returning = True
_has_native_hstore = True
@@ -655,7 +623,6 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
self,
executemany_mode="values_only",
executemany_batch_page_size=100,
- executemany_values_page_size=1000,
**kwargs,
):
_PGDialect_common_psycopg.__init__(self, **kwargs)
@@ -665,19 +632,13 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
self.executemany_mode = parse_user_argument_for_enum(
executemany_mode,
{
- EXECUTEMANY_PLAIN: [None],
- EXECUTEMANY_BATCH: ["batch"],
EXECUTEMANY_VALUES: ["values_only"],
- EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
+ EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch"],
},
"executemany_mode",
)
- if self.executemany_mode & EXECUTEMANY_VALUES:
- self.insert_executemany_returning = True
-
self.executemany_batch_page_size = executemany_batch_page_size
- self.executemany_values_page_size = executemany_values_page_size
if self.dbapi and hasattr(self.dbapi, "__version__"):
m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
@@ -699,14 +660,8 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
is not None
)
- # PGDialect.initialize() checks server version for <= 8.2 and sets
- # this flag to False if so
- if not self.insert_returning:
- self.insert_executemany_returning = False
- self.executemany_mode = EXECUTEMANY_PLAIN
-
- self.supports_sane_multi_rowcount = not (
- self.executemany_mode & EXECUTEMANY_BATCH
+ self.supports_sane_multi_rowcount = (
+ self.executemany_mode is not EXECUTEMANY_VALUES_PLUS_BATCH
)
@classmethod
@@ -806,39 +761,7 @@ class PGDialect_psycopg2(_PGDialect_common_psycopg):
return None
def do_executemany(self, cursor, statement, parameters, context=None):
- if (
- self.executemany_mode & EXECUTEMANY_VALUES
- and context
- and context.isinsert
- and context.compiled._is_safe_for_fast_insert_values_helper
- ):
- executemany_values = (
- "(%s)" % context.compiled.insert_single_values_expr
- )
-
- # guard for statement that was altered via event hook or similar
- if executemany_values not in statement:
- executemany_values = None
- else:
- executemany_values = None
-
- if executemany_values:
- statement = statement.replace(executemany_values, "%s")
- if self.executemany_values_page_size:
- kwargs = {"page_size": self.executemany_values_page_size}
- else:
- kwargs = {}
- xtras = self._psycopg2_extras
- context._psycopg2_fetched_rows = xtras.execute_values(
- cursor,
- statement,
- parameters,
- template=executemany_values,
- fetch=bool(context.compiled.effective_returning),
- **kwargs,
- )
-
- elif self.executemany_mode & EXECUTEMANY_BATCH:
+ if self.executemany_mode is EXECUTEMANY_VALUES_PLUS_BATCH:
if self.executemany_batch_page_size:
kwargs = {"page_size": self.executemany_batch_page_size}
else:
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index 88c6dbe18..e57a84fe0 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -1936,6 +1936,7 @@ class SQLiteDialect(default.DefaultDialect):
supports_empty_insert = False
supports_cast = True
supports_multivalues_insert = True
+ use_insertmanyvalues = True
tuple_in_values = True
supports_statement_cache = True
insert_null_pk_still_autoincrements = True
@@ -1944,6 +1945,13 @@ class SQLiteDialect(default.DefaultDialect):
delete_returning = True
update_returning_multifrom = True
+ supports_default_metavalue = True
+ """dialect supports INSERT... VALUES (DEFAULT) syntax"""
+
+ default_metavalue_token = "NULL"
+ """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
+ parenthesis."""
+
default_paramstyle = "qmark"
execution_ctx_cls = SQLiteExecutionContext
statement_compiler = SQLiteCompiler
@@ -2055,6 +2063,10 @@ class SQLiteDialect(default.DefaultDialect):
self.delete_returning
) = self.insert_returning = False
+ if self.dbapi.sqlite_version_info < (3, 32, 0):
+ # https://www.sqlite.org/limits.html
+ self.insertmanyvalues_max_parameters = 999
+
_isolation_lookup = util.immutabledict(
{"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
)
diff --git a/lib/sqlalchemy/dialects/sqlite/provision.py b/lib/sqlalchemy/dialects/sqlite/provision.py
index a590f9f03..05ee6c625 100644
--- a/lib/sqlalchemy/dialects/sqlite/provision.py
+++ b/lib/sqlalchemy/dialects/sqlite/provision.py
@@ -14,6 +14,7 @@ from ...testing.provision import post_configure_engine
from ...testing.provision import run_reap_dbs
from ...testing.provision import stop_test_class_outside_fixtures
from ...testing.provision import temp_table_keyword_args
+from ...testing.provision import upsert
# TODO: I can't get this to build dynamically with pytest-xdist procs
@@ -142,3 +143,18 @@ def _reap_sqlite_dbs(url, idents):
if os.path.exists(path):
log.info("deleting SQLite database file: %s" % path)
os.remove(path)
+
+
+@upsert.for_db("sqlite")
+def _upsert(cfg, table, returning, set_lambda=None):
+ from sqlalchemy.dialects.sqlite import insert
+
+ stmt = insert(table)
+
+ if set_lambda:
+ stmt = stmt.on_conflict_do_update(set_=set_lambda(stmt.excluded))
+ else:
+ stmt = stmt.on_conflict_do_nothing()
+
+ stmt = stmt.returning(*returning)
+ return stmt
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 2b9cf602a..1b07acab5 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -12,6 +12,7 @@ import typing
from typing import Any
from typing import Callable
from typing import cast
+from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
@@ -29,6 +30,7 @@ from .interfaces import BindTyping
from .interfaces import ConnectionEventsTarget
from .interfaces import DBAPICursor
from .interfaces import ExceptionContext
+from .interfaces import ExecuteStyle
from .interfaces import ExecutionContext
from .util import _distill_params_20
from .util import _distill_raw_params
@@ -438,6 +440,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
:ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
describing the ORM version of ``yield_per``
+ :param insertmanyvalues_page_size: number of rows to format into an
+ INSERT statement when the statement uses "insertmanyvalues" mode,
+ which is a paged form of bulk insert that is used for many backends
+ when using :term:`executemany` execution typically in conjunction
+ with RETURNING. Defaults to 1000. May also be modified on a
+ per-engine basis using the
+ :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
:param schema_translate_map: Available on: :class:`_engine.Connection`,
:class:`_engine.Engine`, :class:`_sql.Executable`.
@@ -1795,8 +1811,39 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
context.pre_exec()
+ if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
+ return self._exec_insertmany_context(
+ dialect,
+ context,
+ )
+ else:
+ return self._exec_single_context(
+ dialect, context, statement, parameters
+ )
+
+ def _exec_single_context(
+ self,
+ dialect: Dialect,
+ context: ExecutionContext,
+ statement: Union[str, Compiled],
+ parameters: Optional[_AnyMultiExecuteParams],
+ ) -> CursorResult[Any]:
+ """continue the _execute_context() method for a single DBAPI
+ cursor.execute() or cursor.executemany() call.
+
+ """
if dialect.bind_typing is BindTyping.SETINPUTSIZES:
- context._set_input_sizes()
+ generic_setinputsizes = context._prepare_set_input_sizes()
+
+ if generic_setinputsizes:
+ try:
+ dialect.do_set_input_sizes(
+ context.cursor, generic_setinputsizes, context
+ )
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e, str(statement), parameters, None, context
+ )
cursor, str_statement, parameters = (
context.cursor,
@@ -1840,13 +1887,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
)
else:
self._log_info(
- "[%s] [SQL parameters hidden due to hide_parameters=True]"
- % (stats,)
+ "[%s] [SQL parameters hidden due to hide_parameters=True]",
+ stats,
)
evt_handled: bool = False
try:
- if context.executemany:
+ if context.execute_style is ExecuteStyle.EXECUTEMANY:
effective_parameters = cast(
"_CoreMultiExecuteParams", effective_parameters
)
@@ -1862,7 +1909,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
break
if not evt_handled:
self.dialect.do_executemany(
- cursor, str_statement, effective_parameters, context
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
)
elif not effective_parameters and context.no_parameters:
if self.dialect._has_events:
@@ -1914,6 +1964,151 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return result
+ def _exec_insertmany_context(
+ self,
+ dialect: Dialect,
+ context: ExecutionContext,
+ ) -> CursorResult[Any]:
+ """continue the _execute_context() method for an "insertmanyvalues"
+ operation, which will invoke DBAPI
+ cursor.execute() one or more times with individual log and
+ event hook calls.
+
+ """
+
+ if dialect.bind_typing is BindTyping.SETINPUTSIZES:
+ generic_setinputsizes = context._prepare_set_input_sizes()
+ else:
+ generic_setinputsizes = None
+
+ cursor, str_statement, parameters = (
+ context.cursor,
+ context.statement,
+ context.parameters,
+ )
+
+ effective_parameters = parameters
+
+ engine_events = self._has_events or self.engine._has_events
+ if self.dialect._has_events:
+ do_execute_dispatch: Iterable[
+ Any
+ ] = self.dialect.dispatch.do_execute
+ else:
+ do_execute_dispatch = ()
+
+ if self._echo:
+ stats = context._get_cache_stats() + " (insertmanyvalues)"
+ for (
+ sub_stmt,
+ sub_params,
+ setinputsizes,
+ batchnum,
+ totalbatches,
+ ) in dialect._deliver_insertmanyvalues_batches(
+ cursor,
+ str_statement,
+ effective_parameters,
+ generic_setinputsizes,
+ context,
+ ):
+
+ if setinputsizes:
+ try:
+ dialect.do_set_input_sizes(
+ context.cursor, setinputsizes, context
+ )
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e,
+ sql_util._long_statement(sub_stmt),
+ sub_params,
+ None,
+ context,
+ )
+
+ if engine_events:
+ for fn in self.dispatch.before_cursor_execute:
+ sub_stmt, sub_params = fn(
+ self,
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ True,
+ )
+
+ if self._echo:
+
+ self._log_info(sql_util._long_statement(sub_stmt))
+
+ if batchnum > 1:
+ stats = (
+ f"insertmanyvalues batch {batchnum} "
+ f"of {totalbatches}"
+ )
+
+ if not self.engine.hide_parameters:
+ self._log_info(
+ "[%s] %r",
+ stats,
+ sql_util._repr_params(
+ sub_params,
+ batches=10,
+ ismulti=False,
+ ),
+ )
+ else:
+ self._log_info(
+ "[%s] [SQL parameters hidden due to "
+ "hide_parameters=True]",
+ stats,
+ )
+
+ try:
+ for fn in do_execute_dispatch:
+ if fn(
+ cursor,
+ sub_stmt,
+ sub_params,
+ context,
+ ):
+ break
+ else:
+ dialect.do_execute(cursor, sub_stmt, sub_params, context)
+
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e,
+ sql_util._long_statement(sub_stmt),
+ sub_params,
+ cursor,
+ context,
+ is_sub_exec=True,
+ )
+
+ if engine_events:
+ self.dispatch.after_cursor_execute(
+ self,
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ context.executemany,
+ )
+
+ try:
+ context.post_exec()
+
+ result = context._setup_result_proxy()
+
+ except BaseException as e:
+ self._handle_dbapi_exception(
+ e, str_statement, effective_parameters, cursor, context
+ )
+
+ return result
+
def _cursor_execute(
self,
cursor: DBAPICursor,
@@ -1983,6 +2178,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
parameters: Optional[_AnyExecuteParams],
cursor: Optional[DBAPICursor],
context: Optional[ExecutionContext],
+ is_sub_exec: bool = False,
) -> NoReturn:
exc_info = sys.exc_info()
@@ -2001,6 +2197,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
invalidate_pool_on_disconnect = not is_exit_exception
+ ismulti: bool = (
+ not is_sub_exec and context.executemany
+ if context is not None
+ else False
+ )
if self._reentrant_error:
raise exc.DBAPIError.instance(
statement,
@@ -2009,7 +2210,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.dialect.loaded_dbapi.Error,
hide_parameters=self.engine.hide_parameters,
dialect=self.dialect,
- ismulti=context.executemany if context is not None else None,
+ ismulti=ismulti,
).with_traceback(exc_info[2]) from e
self._reentrant_error = True
try:
@@ -2030,9 +2231,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
hide_parameters=self.engine.hide_parameters,
connection_invalidated=self._is_disconnect,
dialect=self.dialect,
- ismulti=context.executemany
- if context is not None
- else None,
+ ismulti=ismulti,
)
else:
sqlalchemy_exception = None
diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py
index 36119ab24..a9b388d71 100644
--- a/lib/sqlalchemy/engine/create.py
+++ b/lib/sqlalchemy/engine/create.py
@@ -58,6 +58,7 @@ def create_engine(
future: Literal[True],
hide_parameters: bool = ...,
implicit_returning: Literal[True] = ...,
+ insertmanyvalues_page_size: int = ...,
isolation_level: _IsolationLevel = ...,
json_deserializer: Callable[..., Any] = ...,
json_serializer: Callable[..., Any] = ...,
@@ -79,6 +80,7 @@ def create_engine(
pool_use_lifo: bool = ...,
plugins: List[str] = ...,
query_cache_size: int = ...,
+ use_insertmanyvalues: bool = ...,
**kwargs: Any,
) -> Engine:
...
@@ -273,6 +275,23 @@ def create_engine(url: Union[str, "_url.URL"], **kwargs: Any) -> Engine:
:paramref:`.Table.implicit_returning` parameter.
+ :param insertmanyvalues_page_size: number of rows to format into an
+ INSERT statement when the statement uses "insertmanyvalues" mode, which is
+ a paged form of bulk insert that is used for many backends when using
+ :term:`executemany` execution typically in conjunction with RETURNING.
+ Defaults to 1000, but may also be subject to dialect-specific limiting
+ factors which may override this value on a per-statement basis.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
+ :ref:`engine_insertmanyvalues_page_size`
+
+ :paramref:`_engine.Connection.execution_options.insertmanyvalues_page_size`
+
:param isolation_level: optional string name of an isolation level
which will be set on all new connections unconditionally.
Isolation levels are typically some subset of the string names
@@ -508,6 +527,15 @@ def create_engine(url: Union[str, "_url.URL"], **kwargs: Any) -> Engine:
.. versionadded:: 1.4
+ :param use_insertmanyvalues: True by default, use the "insertmanyvalues"
+ execution style for INSERT..RETURNING statements by default.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
""" # noqa
if "strategy" in kwargs:
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 3a53f8157..11ab713d0 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -35,6 +35,7 @@ from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import Union
import weakref
from . import characteristics
@@ -44,6 +45,7 @@ from .base import Connection
from .interfaces import CacheStats
from .interfaces import DBAPICursor
from .interfaces import Dialect
+from .interfaces import ExecuteStyle
from .interfaces import ExecutionContext
from .reflection import ObjectKind
from .reflection import ObjectScope
@@ -52,13 +54,16 @@ from .. import exc
from .. import pool
from .. import util
from ..sql import compiler
+from ..sql import dml
from ..sql import expression
from ..sql import type_api
from ..sql._typing import is_tuple_type
+from ..sql.base import _NoArg
from ..sql.compiler import DDLCompiler
from ..sql.compiler import SQLCompiler
from ..sql.elements import quoted_name
from ..sql.schema import default_is_scalar
+from ..util.typing import Final
from ..util.typing import Literal
if typing.TYPE_CHECKING:
@@ -146,7 +151,6 @@ class DefaultDialect(Dialect):
update_returning_multifrom = False
delete_returning_multifrom = False
insert_returning = False
- insert_executemany_returning = False
cte_follows_insert = False
@@ -208,6 +212,10 @@ class DefaultDialect(Dialect):
supports_default_metavalue = False
"""dialect supports INSERT... VALUES (DEFAULT) syntax"""
+ default_metavalue_token = "DEFAULT"
+ """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
+ parenthesis."""
+
# not sure if this is a real thing but the compiler will deliver it
# if this is the only flag enabled.
supports_empty_insert = True
@@ -215,6 +223,13 @@ class DefaultDialect(Dialect):
supports_multivalues_insert = False
+ use_insertmanyvalues: bool = False
+
+ use_insertmanyvalues_wo_returning: bool = False
+
+ insertmanyvalues_page_size: int = 1000
+ insertmanyvalues_max_parameters = 32700
+
supports_is_distinct_from = True
supports_server_side_cursors = False
@@ -272,6 +287,8 @@ class DefaultDialect(Dialect):
supports_native_boolean: Optional[bool] = None,
max_identifier_length: Optional[int] = None,
label_length: Optional[int] = None,
+ insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
+ use_insertmanyvalues: Optional[bool] = None,
# util.deprecated_params decorator cannot render the
# Linting.NO_LINTING constant
compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
@@ -332,6 +349,12 @@ class DefaultDialect(Dialect):
self.label_length = label_length
self.compiler_linting = compiler_linting
+ if use_insertmanyvalues is not None:
+ self.use_insertmanyvalues = use_insertmanyvalues
+
+ if insertmanyvalues_page_size is not _NoArg.NO_ARG:
+ self.insertmanyvalues_page_size = insertmanyvalues_page_size
+
@util.deprecated_property(
"2.0",
"full_returning is deprecated, please use insert_returning, "
@@ -344,6 +367,17 @@ class DefaultDialect(Dialect):
and self.delete_returning
)
+ @property
+ def insert_executemany_returning(self):
+ return (
+ self.insert_returning
+ and self.supports_multivalues_insert
+ and self.use_insertmanyvalues
+ )
+
+ update_executemany_returning = False
+ delete_executemany_returning = False
+
@util.memoized_property
def loaded_dbapi(self) -> ModuleType:
if self.dbapi is None:
@@ -682,6 +716,27 @@ class DefaultDialect(Dialect):
def do_release_savepoint(self, connection, name):
connection.execute(expression.ReleaseSavepointClause(name))
+ def _deliver_insertmanyvalues_batches(
+ self, cursor, statement, parameters, generic_setinputsizes, context
+ ):
+ context = cast(DefaultExecutionContext, context)
+ compiled = cast(SQLCompiler, context.compiled)
+
+ is_returning: Final[bool] = bool(compiled.effective_returning)
+ batch_size = context.execution_options.get(
+ "insertmanyvalues_page_size", self.insertmanyvalues_page_size
+ )
+
+ if is_returning:
+ context._insertmanyvalues_rows = result = []
+
+ for batch_rec in compiled._deliver_insertmanyvalues_batches(
+ statement, parameters, generic_setinputsizes, batch_size
+ ):
+ yield batch_rec
+ if is_returning:
+ result.extend(cursor.fetchall())
+
def do_executemany(self, cursor, statement, parameters, context=None):
cursor.executemany(statement, parameters)
@@ -936,7 +991,8 @@ class DefaultExecutionContext(ExecutionContext):
is_text = False
isddl = False
- executemany = False
+ execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
+
compiled: Optional[Compiled] = None
result_column_struct: Optional[
Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
@@ -982,6 +1038,8 @@ class DefaultExecutionContext(ExecutionContext):
_empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
+ _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
+
@classmethod
def _init_ddl(
cls,
@@ -1061,23 +1119,55 @@ class DefaultExecutionContext(ExecutionContext):
compiled._loose_column_name_matching,
)
- self.isinsert = compiled.isinsert
- self.isupdate = compiled.isupdate
- self.isdelete = compiled.isdelete
+ self.isinsert = ii = compiled.isinsert
+ self.isupdate = iu = compiled.isupdate
+ self.isdelete = id_ = compiled.isdelete
self.is_text = compiled.isplaintext
- if self.isinsert or self.isupdate or self.isdelete:
+ if ii or iu or id_:
if TYPE_CHECKING:
assert isinstance(compiled.statement, UpdateBase)
self.is_crud = True
- self._is_explicit_returning = bool(compiled.statement._returning)
- self._is_implicit_returning = is_implicit_returning = bool(
+ self._is_explicit_returning = ier = bool(
+ compiled.statement._returning
+ )
+ self._is_implicit_returning = iir = is_implicit_returning = bool(
compiled.implicit_returning
)
assert not (
is_implicit_returning and compiled.statement._returning
)
+ if (ier or iir) and compiled.for_executemany:
+ if ii and not self.dialect.insert_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "INSERT..RETURNING when executemany is used"
+ )
+ elif (
+ ii
+ and self.dialect.use_insertmanyvalues
+ and not compiled._insertmanyvalues
+ ):
+ raise exc.InvalidRequestError(
+ 'Statement does not have "insertmanyvalues" '
+ "enabled, can't use INSERT..RETURNING with "
+ "executemany in this case."
+ )
+ elif iu and not self.dialect.update_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "UPDATE..RETURNING when executemany is used"
+ )
+ elif id_ and not self.dialect.delete_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "DELETE..RETURNING when executemany is used"
+ )
+
if not parameters:
self.compiled_parameters = [
compiled.construct_params(
@@ -1096,7 +1186,11 @@ class DefaultExecutionContext(ExecutionContext):
for grp, m in enumerate(parameters)
]
- self.executemany = len(parameters) > 1
+ if len(parameters) > 1:
+ if self.isinsert and compiled._insertmanyvalues:
+ self.execute_style = ExecuteStyle.INSERTMANYVALUES
+ else:
+ self.execute_style = ExecuteStyle.EXECUTEMANY
self.unicode_statement = compiled.string
@@ -1238,7 +1332,8 @@ class DefaultExecutionContext(ExecutionContext):
dialect.execute_sequence_format(p) for p in parameters
]
- self.executemany = len(parameters) > 1
+ if len(parameters) > 1:
+ self.execute_style = ExecuteStyle.EXECUTEMANY
self.statement = self.unicode_statement = statement
@@ -1293,6 +1388,13 @@ class DefaultExecutionContext(ExecutionContext):
else:
return "unknown"
+ @property
+ def executemany(self):
+ return self.execute_style in (
+ ExecuteStyle.EXECUTEMANY,
+ ExecuteStyle.INSERTMANYVALUES,
+ )
+
@util.memoized_property
def identifier_preparer(self):
if self.compiled:
@@ -1555,7 +1657,23 @@ class DefaultExecutionContext(ExecutionContext):
def _setup_dml_or_text_result(self):
compiled = cast(SQLCompiler, self.compiled)
+ strategy = self.cursor_fetch_strategy
+
if self.isinsert:
+ if (
+ self.execute_style is ExecuteStyle.INSERTMANYVALUES
+ and compiled.effective_returning
+ ):
+ strategy = _cursor.FullyBufferedCursorFetchStrategy(
+ self.cursor,
+ initial_buffer=self._insertmanyvalues_rows,
+ # maintain alt cursor description if set by the
+ # dialect, e.g. mssql preserves it
+ alternate_description=(
+ strategy.alternate_cursor_description
+ ),
+ )
+
if compiled.postfetch_lastrowid:
self.inserted_primary_key_rows = (
self._setup_ins_pk_from_lastrowid()
@@ -1564,7 +1682,6 @@ class DefaultExecutionContext(ExecutionContext):
# the default inserted_primary_key_rows accessor will
# return an "empty" primary key collection when accessed.
- strategy = self.cursor_fetch_strategy
if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
strategy = _cursor.BufferedRowCursorFetchStrategy(
self.cursor, self.execution_options
@@ -1675,8 +1792,11 @@ class DefaultExecutionContext(ExecutionContext):
cast(SQLCompiler, self.compiled).postfetch
)
- def _set_input_sizes(self):
- """Given a cursor and ClauseParameters, call the appropriate
+ def _prepare_set_input_sizes(
+ self,
+ ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
+ """Given a cursor and ClauseParameters, prepare arguments
+ in order to call the appropriate
style of ``setinputsizes()`` on the cursor, using DB-API types
from the bind parameter's ``TypeEngine`` objects.
@@ -1691,14 +1811,14 @@ class DefaultExecutionContext(ExecutionContext):
"""
if self.isddl or self.is_text:
- return
+ return None
compiled = cast(SQLCompiler, self.compiled)
inputsizes = compiled._get_set_input_sizes_lookup()
if inputsizes is None:
- return
+ return None
dialect = self.dialect
@@ -1775,12 +1895,8 @@ class DefaultExecutionContext(ExecutionContext):
generic_inputsizes.append(
(escaped_name, dbtype, bindparam.type)
)
- try:
- dialect.do_set_input_sizes(self.cursor, generic_inputsizes, self)
- except BaseException as e:
- self.root_connection._handle_dbapi_exception(
- e, None, None, None, self
- )
+
+ return generic_inputsizes
def _exec_default(self, column, default, type_):
if default.is_sequence:
@@ -1906,7 +2022,7 @@ class DefaultExecutionContext(ExecutionContext):
assert compile_state is not None
if (
isolate_multiinsert_groups
- and self.isinsert
+ and dml.isinsert(compile_state)
and compile_state._has_multi_parameters
):
if column._is_multiparam_column:
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index 01b266d68..fb59acbd0 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -18,6 +18,7 @@ from typing import ClassVar
from typing import Collection
from typing import Dict
from typing import Iterable
+from typing import Iterator
from typing import List
from typing import Mapping
from typing import MutableMapping
@@ -62,6 +63,7 @@ if TYPE_CHECKING:
from ..sql.elements import ClauseElement
from ..sql.schema import Column
from ..sql.schema import DefaultGenerator
+ from ..sql.schema import SchemaItem
from ..sql.schema import Sequence as Sequence_SchemaItem
from ..sql.sqltypes import Integer
from ..sql.type_api import _TypeMemoDict
@@ -80,6 +82,28 @@ class CacheStats(Enum):
NO_DIALECT_SUPPORT = 4
+class ExecuteStyle(Enum):
+ """indicates the :term:`DBAPI` cursor method that will be used to invoke
+ a statement."""
+
+ EXECUTE = 0
+ """indicates cursor.execute() will be used"""
+
+ EXECUTEMANY = 1
+ """indicates cursor.executemany() will be used."""
+
+ INSERTMANYVALUES = 2
+ """indicates cursor.execute() will be used with an INSERT where the
+ VALUES expression will be expanded to accommodate for multiple
+ parameter sets
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
+ """
+
+
class DBAPIConnection(Protocol):
"""protocol representing a :pep:`249` database connection.
@@ -235,6 +259,8 @@ _ImmutableExecuteOptions = immutabledict[str, Any]
_ParamStyle = Literal["qmark", "numeric", "named", "format", "pyformat"]
+_GenericSetInputSizesType = List[Tuple[str, Any, "TypeEngine[Any]"]]
+
_IsolationLevel = Literal[
"SERIALIZABLE",
"REPEATABLE READ",
@@ -608,6 +634,8 @@ class Dialect(EventTarget):
driver: str
"""identifying name for the dialect's DBAPI"""
+ dialect_description: str
+
dbapi: Optional[ModuleType]
"""A reference to the DBAPI module object itself.
@@ -748,23 +776,125 @@ class Dialect(EventTarget):
executemany.
"""
+ supports_empty_insert: bool
+ """dialect supports INSERT () VALUES (), i.e. a plain INSERT with no
+ columns in it.
+
+ This is not usually supported; an "empty" insert is typically
+ suited using either "INSERT..DEFAULT VALUES" or
+ "INSERT ... (col) VALUES (DEFAULT)".
+
+ """
+
supports_default_values: bool
"""dialect supports INSERT... DEFAULT VALUES syntax"""
supports_default_metavalue: bool
- """dialect supports INSERT... VALUES (DEFAULT) syntax"""
+ """dialect supports INSERT...(col) VALUES (DEFAULT) syntax.
- supports_empty_insert: bool
- """dialect supports INSERT () VALUES ()"""
+ Most databases support this in some way, e.g. SQLite supports it using
+ ``VALUES (NULL)``. MS SQL Server supports the syntax also however
+ is the only included dialect where we have this disabled, as
+ MSSQL does not support the field for the IDENTITY column, which is
+ usually where we like to make use of the feature.
+
+ """
+
+ default_metavalue_token: str = "DEFAULT"
+ """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
+ parenthesis.
+
+ E.g. for SQLite this is the keyword "NULL".
+
+ """
supports_multivalues_insert: bool
"""Target database supports INSERT...VALUES with multiple value
- sets"""
+ sets, i.e. INSERT INTO table (cols) VALUES (...), (...), (...), ...
+
+ """
+
+ insert_executemany_returning: bool
+ """dialect / driver / database supports some means of providing
+ INSERT...RETURNING support when dialect.do_executemany() is used.
+
+ """
+
+ update_executemany_returning: bool
+ """dialect supports UPDATE..RETURNING with executemany."""
+
+ delete_executemany_returning: bool
+ """dialect supports DELETE..RETURNING with executemany."""
+
+ use_insertmanyvalues: bool
+ """if True, indicates "insertmanyvalues" functionality should be used
+ to allow for ``insert_executemany_returning`` behavior, if possible.
+
+ In practice, setting this to True means:
+
+ if ``supports_multivalues_insert``, ``insert_returning`` and
+ ``use_insertmanyvalues`` are all True, the SQL compiler will produce
+ an INSERT that will be interpreted by the :class:`.DefaultDialect`
+ as an :attr:`.ExecuteStyle.INSERTMANYVALUES` execution that allows
+ for INSERT of many rows with RETURNING by rewriting a single-row
+ INSERT statement to have multiple VALUES clauses, also executing
+ the statement multiple times for a series of batches when large numbers
+ of rows are given.
+
+ The parameter is False for the default dialect, and is set to
+ True for SQLAlchemy internal dialects SQLite, MySQL/MariaDB, PostgreSQL,
+ SQL Server. It remains at False for Oracle, which provides native
+ "executemany with RETURNING" support and also does not support
+ ``supports_multivalues_insert``. For MySQL/MariaDB, those MySQL
+ dialects that don't support RETURNING will not report
+ ``insert_executemany_returning`` as True.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`engine_insertmanyvalues`
+
+ """
+
+ use_insertmanyvalues_wo_returning: bool
+ """if True, and use_insertmanyvalues is also True, INSERT statements
+ that don't include RETURNING will also use "insertmanyvalues".
+
+ .. versionadded:: 2.0
+
+ """
+
+ insertmanyvalues_page_size: int
+ """Number of rows to render into an individual INSERT..VALUES() statement
+ for :attr:`.ExecuteStyle.INSERTMANYVALUES` executions.
+
+ The default dialect defaults this to 1000.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :paramref:`_engine.Connection.execution_options.insertmanyvalues_page_size` -
+ execution option available on :class:`_engine.Connection`, statements
+
+ """ # noqa: E501
+
+ insertmanyvalues_max_parameters: int
+ """Alternate to insertmanyvalues_page_size, will additionally limit
+ page size based on number of parameters total in the statement.
+
+
+ """
preexecute_autoincrement_sequences: bool
"""True if 'implicit' primary key functions must be executed separately
- in order to get their value. This is currently oriented towards
- PostgreSQL.
+ in order to get their value, if RETURNING is not used.
+
+ This is currently oriented towards PostgreSQL when the
+ ``implicit_returning=False`` parameter is used on a :class:`.Table`
+ object.
+
"""
insert_returning: bool
@@ -810,6 +940,13 @@ class Dialect(EventTarget):
"""
+ supports_identity_columns: bool
+ """target database supports IDENTITY"""
+
+ cte_follows_insert: bool
+ """target database, when given a CTE with an INSERT statement, needs
+ the CTE to be below the INSERT"""
+
colspecs: MutableMapping[Type["TypeEngine[Any]"], Type["TypeEngine[Any]"]]
"""A dictionary of TypeEngine classes from sqlalchemy.types mapped
to subclasses that are specific to the dialect class. This
@@ -860,7 +997,7 @@ class Dialect(EventTarget):
"""
construct_arguments: Optional[
- List[Tuple[Type[ClauseElement], Mapping[str, Any]]]
+ List[Tuple[Type[Union[SchemaItem, ClauseElement]], Mapping[str, Any]]]
] = None
"""Optional set of argument specifiers for various SQLAlchemy
constructs, typically schema items.
@@ -1007,19 +1144,6 @@ class Dialect(EventTarget):
_bind_typing_render_casts: bool
- supports_identity_columns: bool
- """target database supports IDENTITY"""
-
- cte_follows_insert: bool
- """target database, when given a CTE with an INSERT statement, needs
- the CTE to be below the INSERT"""
-
- insert_executemany_returning: bool
- """dialect / driver / database supports some means of providing RETURNING
- support when dialect.do_executemany() is used.
-
- """
-
_type_memos: MutableMapping[TypeEngine[Any], "_TypeMemoDict"]
def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
@@ -1826,8 +1950,8 @@ class Dialect(EventTarget):
def do_set_input_sizes(
self,
cursor: DBAPICursor,
- list_of_tuples: List[Tuple[str, Any, TypeEngine[Any]]],
- context: "ExecutionContext",
+ list_of_tuples: _GenericSetInputSizesType,
+ context: ExecutionContext,
) -> Any:
"""invoke the cursor.setinputsizes() method with appropriate arguments
@@ -1961,12 +2085,35 @@ class Dialect(EventTarget):
raise NotImplementedError()
+ def _deliver_insertmanyvalues_batches(
+ self,
+ cursor: DBAPICursor,
+ statement: str,
+ parameters: _DBAPIMultiExecuteParams,
+ generic_setinputsizes: Optional[_GenericSetInputSizesType],
+ context: ExecutionContext,
+ ) -> Iterator[
+ Tuple[
+ str,
+ _DBAPISingleExecuteParams,
+ _GenericSetInputSizesType,
+ int,
+ int,
+ ]
+ ]:
+ """convert executemany parameters for an INSERT into an iterator
+ of statement/single execute values, used by the insertmanyvalues
+ feature.
+
+ """
+ raise NotImplementedError()
+
def do_executemany(
self,
cursor: DBAPICursor,
statement: str,
parameters: _DBAPIMultiExecuteParams,
- context: Optional["ExecutionContext"] = None,
+ context: Optional[ExecutionContext] = None,
) -> None:
"""Provide an implementation of ``cursor.executemany(statement,
parameters)``."""
@@ -2743,7 +2890,9 @@ class ExecutionContext:
These are always stored as a list of parameter entries. A single-element
list corresponds to a ``cursor.execute()`` call and a multiple-element
- list corresponds to ``cursor.executemany()``.
+ list corresponds to ``cursor.executemany()``, except in the case
+ of :attr:`.ExecuteStyle.INSERTMANYVALUES` which will use
+ ``cursor.execute()`` one or more times.
"""
@@ -2756,8 +2905,23 @@ class ExecutionContext:
isupdate: bool
"""True if the statement is an UPDATE."""
+ execute_style: ExecuteStyle
+ """the style of DBAPI cursor method that will be used to execute
+ a statement.
+
+ .. versionadded:: 2.0
+
+ """
+
executemany: bool
- """True if the parameters have determined this to be an executemany"""
+ """True if the context has a list of more than one parameter set.
+
+ Historically this attribute links to whether ``cursor.execute()`` or
+ ``cursor.executemany()`` will be used. It also can now mean that
+ "insertmanyvalues" may be used which indicates one or more
+ ``cursor.execute()`` calls.
+
+ """
prefetch_cols: util.generic_fn_descriptor[Optional[Sequence[Column[Any]]]]
"""a list of Column objects for which a client-side default
@@ -2824,7 +2988,9 @@ class ExecutionContext:
) -> Any:
raise NotImplementedError()
- def _set_input_sizes(self) -> None:
+ def _prepare_set_input_sizes(
+ self,
+ ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
raise NotImplementedError()
def _get_cache_stats(self) -> str:
diff --git a/lib/sqlalchemy/orm/context.py b/lib/sqlalchemy/orm/context.py
index 4f24103df..dc96f8c3c 100644
--- a/lib/sqlalchemy/orm/context.py
+++ b/lib/sqlalchemy/orm/context.py
@@ -776,6 +776,10 @@ class FromStatement(GroupedElement, Generative, TypedReturnsRows[_TP]):
return self.element._all_selected_columns
@property
+ def _return_defaults(self):
+ return self.element._return_defaults if is_dml(self.element) else None
+
+ @property
def _returning(self):
return self.element._returning if is_dml(self.element) else None
diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py
index 6310f5b1b..abd528986 100644
--- a/lib/sqlalchemy/orm/persistence.py
+++ b/lib/sqlalchemy/orm/persistence.py
@@ -1489,7 +1489,7 @@ def _postfetch(
prefetch_cols = result.context.compiled.prefetch
postfetch_cols = result.context.compiled.postfetch
- returning_cols = result.context.compiled.returning
+ returning_cols = result.context.compiled.effective_returning
if (
mapper.version_id_col is not None
diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py
index 20ffb385d..a690da0d5 100644
--- a/lib/sqlalchemy/orm/session.py
+++ b/lib/sqlalchemy/orm/session.py
@@ -3989,14 +3989,6 @@ class Session(_SessionClassMethods, EventTarget):
and SQL clause support are **silently omitted** in favor of raw
INSERT/UPDATES of records.
- Please note that newer versions of SQLAlchemy are **greatly
- improving the efficiency** of the standard flush process. It is
- **strongly recommended** to not use the bulk methods as they
- represent a forking of SQLAlchemy's functionality and are slowly
- being moved into legacy status. New features such as
- :ref:`orm_dml_returning_objects` are both more efficient than
- the "bulk" methods and provide more predictable functionality.
-
**Please read the list of caveats at**
:ref:`bulk_operations_caveats` **before using this method, and
fully test and confirm the functionality of all code developed
@@ -4108,8 +4100,6 @@ class Session(_SessionClassMethods, EventTarget):
organizing the values within them across the tables to which
the given mapper is mapped.
- .. versionadded:: 1.0.0
-
.. warning::
The bulk insert feature allows for a lower-latency INSERT
@@ -4118,14 +4108,6 @@ class Session(_SessionClassMethods, EventTarget):
and SQL clause support are **silently omitted** in favor of raw
INSERT of records.
- Please note that newer versions of SQLAlchemy are **greatly
- improving the efficiency** of the standard flush process. It is
- **strongly recommended** to not use the bulk methods as they
- represent a forking of SQLAlchemy's functionality and are slowly
- being moved into legacy status. New features such as
- :ref:`orm_dml_returning_objects` are both more efficient than
- the "bulk" methods and provide more predictable functionality.
-
**Please read the list of caveats at**
:ref:`bulk_operations_caveats` **before using this method, and
fully test and confirm the functionality of all code developed
@@ -4142,19 +4124,18 @@ class Session(_SessionClassMethods, EventTarget):
such as a joined-inheritance mapping, each dictionary must contain all
keys to be populated into all tables.
- :param return_defaults: when True, rows that are missing values which
- generate defaults, namely integer primary key defaults and sequences,
- will be inserted **one at a time**, so that the primary key value
- is available. In particular this will allow joined-inheritance
- and other multi-table mappings to insert correctly without the need
- to provide primary
- key values ahead of time; however,
- :paramref:`.Session.bulk_insert_mappings.return_defaults`
- **greatly reduces the performance gains** of the method overall.
- If the rows
- to be inserted only refer to a single table, then there is no
- reason this flag should be set as the returned default information
- is not used.
+ :param return_defaults: when True, the INSERT process will be altered
+ to ensure that newly generated primary key values will be fetched.
+ The rationale for this parameter is typically to enable
+ :ref:`Joined Table Inheritance <joined_inheritance>` mappings to
+ be bulk inserted.
+
+ .. note:: for backends that don't support RETURNING, the
+ :paramref:`_orm.Session.bulk_insert_mappings.return_defaults`
+ parameter can significantly decrease performance as INSERT
+ statements can no longer be batched. See
+ :ref:`engine_insertmanyvalues`
+ for background on which backends are affected.
:param render_nulls: When True, a value of ``None`` will result
in a NULL value being included in the INSERT statement, rather
@@ -4178,8 +4159,6 @@ class Session(_SessionClassMethods, EventTarget):
to ensure that no server-side default functions need to be
invoked for the operation as a whole.
- .. versionadded:: 1.1
-
.. seealso::
:ref:`bulk_operations`
@@ -4211,8 +4190,6 @@ class Session(_SessionClassMethods, EventTarget):
state management features in use, reducing latency when updating
large numbers of simple rows.
- .. versionadded:: 1.0.0
-
.. warning::
The bulk update feature allows for a lower-latency UPDATE
@@ -4221,14 +4198,6 @@ class Session(_SessionClassMethods, EventTarget):
and SQL clause support are **silently omitted** in favor of raw
UPDATES of records.
- Please note that newer versions of SQLAlchemy are **greatly
- improving the efficiency** of the standard flush process. It is
- **strongly recommended** to not use the bulk methods as they
- represent a forking of SQLAlchemy's functionality and are slowly
- being moved into legacy status. New features such as
- :ref:`orm_dml_returning_objects` are both more efficient than
- the "bulk" methods and provide more predictable functionality.
-
**Please read the list of caveats at**
:ref:`bulk_operations_caveats` **before using this method, and
fully test and confirm the functionality of all code developed
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index 1d13ffa9a..201324a2a 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -94,6 +94,7 @@ if typing.TYPE_CHECKING:
from .elements import BindParameter
from .elements import ColumnClause
from .elements import ColumnElement
+ from .elements import KeyedColumnElement
from .elements import Label
from .functions import Function
from .selectable import AliasedReturnsRows
@@ -390,6 +391,19 @@ class _CompilerStackEntry(_BaseCompilerStackEntry, total=False):
class ExpandedState(NamedTuple):
+ """represents state to use when producing "expanded" and
+ "post compile" bound parameters for a statement.
+
+ "expanded" parameters are parameters that are generated at
+ statement execution time to suit a number of parameters passed, the most
+ prominent example being the individual elements inside of an IN expression.
+
+ "post compile" parameters are parameters where the SQL literal value
+ will be rendered into the SQL statement at execution time, rather than
+ being passed as separate parameters to the driver.
+
+ """
+
statement: str
additional_parameters: _CoreSingleExecuteParams
processors: Mapping[str, _BindProcessorType[Any]]
@@ -397,7 +411,23 @@ class ExpandedState(NamedTuple):
parameter_expansion: Mapping[str, List[str]]
+class _InsertManyValues(NamedTuple):
+ """represents state to use for executing an "insertmanyvalues" statement"""
+
+ is_default_expr: bool
+ single_values_expr: str
+ insert_crud_params: List[Tuple[KeyedColumnElement[Any], str, str]]
+ num_positional_params_counted: int
+
+
class Linting(IntEnum):
+ """represent preferences for the 'SQL linting' feature.
+
+ this feature currently includes support for flagging cartesian products
+ in SQL statements.
+
+ """
+
NO_LINTING = 0
"Disable all linting."
@@ -419,6 +449,9 @@ NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple(
class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])):
+ """represents current state for the "cartesian product" detection
+ feature."""
+
def lint(self, start=None):
froms = self.froms
if not froms:
@@ -762,8 +795,6 @@ class SQLCompiler(Compiled):
is_sql = True
- _result_columns: List[ResultColumnsEntry]
-
compound_keywords = COMPOUND_KEYWORDS
isdelete: bool = False
@@ -810,12 +841,6 @@ class SQLCompiler(Compiled):
"""major statements such as SELECT, INSERT, UPDATE, DELETE are
tracked in this stack using an entry format."""
- result_columns: List[ResultColumnsEntry]
- """relates label names in the final SQL to a tuple of local
- column/label name, ColumnElement object (if any) and
- TypeEngine. CursorResult uses this for type processing and
- column targeting"""
-
returning_precedes_values: bool = False
"""set to True classwide to generate RETURNING
clauses before the VALUES or WHERE clause (i.e. MSSQL)
@@ -835,6 +860,12 @@ class SQLCompiler(Compiled):
driver/DB enforces this
"""
+ _result_columns: List[ResultColumnsEntry]
+ """relates label names in the final SQL to a tuple of local
+ column/label name, ColumnElement object (if any) and
+ TypeEngine. CursorResult uses this for type processing and
+ column targeting"""
+
_textual_ordered_columns: bool = False
"""tell the result object that the column names as rendered are important,
but they are also "ordered" vs. what is in the compiled object here.
@@ -881,14 +912,9 @@ class SQLCompiler(Compiled):
"""
- insert_single_values_expr: Optional[str] = None
- """When an INSERT is compiled with a single set of parameters inside
- a VALUES expression, the string is assigned here, where it can be
- used for insert batching schemes to rewrite the VALUES expression.
+ _insertmanyvalues: Optional[_InsertManyValues] = None
- .. versionadded:: 1.3.8
-
- """
+ _insert_crud_params: Optional[crud._CrudParamSequence] = None
literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset()
"""bindparameter objects that are rendered as literal values at statement
@@ -1072,6 +1098,25 @@ class SQLCompiler(Compiled):
if self._render_postcompile:
self._process_parameters_for_postcompile(_populate_self=True)
+ @property
+ def insert_single_values_expr(self) -> Optional[str]:
+ """When an INSERT is compiled with a single set of parameters inside
+ a VALUES expression, the string is assigned here, where it can be
+ used for insert batching schemes to rewrite the VALUES expression.
+
+ .. versionadded:: 1.3.8
+
+ .. versionchanged:: 2.0 This collection is no longer used by
+ SQLAlchemy's built-in dialects, in favor of the currently
+ internal ``_insertmanyvalues`` collection that is used only by
+ :class:`.SQLCompiler`.
+
+ """
+ if self._insertmanyvalues is None:
+ return None
+ else:
+ return self._insertmanyvalues.single_values_expr
+
@util.ro_memoized_property
def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]:
"""The effective "returning" columns for INSERT, UPDATE or DELETE.
@@ -1620,10 +1665,13 @@ class SQLCompiler(Compiled):
param_key_getter = self._within_exec_param_key_getter
+ assert self.compile_state is not None
+ statement = self.compile_state.statement
+
if TYPE_CHECKING:
- assert isinstance(self.statement, Insert)
+ assert isinstance(statement, Insert)
- table = self.statement.table
+ table = statement.table
getters = [
(operator.methodcaller("get", param_key_getter(col), None), col)
@@ -1697,11 +1745,14 @@ class SQLCompiler(Compiled):
else:
result = util.preloaded.engine_result
+ assert self.compile_state is not None
+ statement = self.compile_state.statement
+
if TYPE_CHECKING:
- assert isinstance(self.statement, Insert)
+ assert isinstance(statement, Insert)
param_key_getter = self._within_exec_param_key_getter
- table = self.statement.table
+ table = statement.table
returning = self.implicit_returning
assert returning is not None
@@ -4506,7 +4557,202 @@ class SQLCompiler(Compiled):
)
return dialect_hints, table_text
+ def _insert_stmt_should_use_insertmanyvalues(self, statement):
+ return (
+ self.dialect.supports_multivalues_insert
+ and self.dialect.use_insertmanyvalues
+ # note self.implicit_returning or self._result_columns
+ # implies self.dialect.insert_returning capability
+ and (
+ self.dialect.use_insertmanyvalues_wo_returning
+ or self.implicit_returning
+ or self._result_columns
+ )
+ )
+
+ def _deliver_insertmanyvalues_batches(
+ self, statement, parameters, generic_setinputsizes, batch_size
+ ):
+ imv = self._insertmanyvalues
+ assert imv is not None
+
+ executemany_values = f"({imv.single_values_expr})"
+
+ lenparams = len(parameters)
+ if imv.is_default_expr and not self.dialect.supports_default_metavalue:
+ # backend doesn't support
+ # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ...
+ # at the moment this is basically SQL Server due to
+ # not being able to use DEFAULT for identity column
+ # just yield out that many single statements! still
+ # faster than a whole connection.execute() call ;)
+ #
+ # note we still are taking advantage of the fact that we know
+ # we are using RETURNING. The generalized approach of fetching
+ # cursor.lastrowid etc. still goes through the more heavyweight
+ # "ExecutionContext per statement" system as it isn't usable
+ # as a generic "RETURNING" approach
+ for batchnum, param in enumerate(parameters, 1):
+ yield (
+ statement,
+ param,
+ generic_setinputsizes,
+ batchnum,
+ lenparams,
+ )
+ return
+ else:
+ statement = statement.replace(
+ executemany_values, "__EXECMANY_TOKEN__"
+ )
+
+ # Use optional insertmanyvalues_max_parameters
+ # to further shrink the batch size so that there are no more than
+ # insertmanyvalues_max_parameters params.
+ # Currently used by SQL Server, which limits statements to 2100 bound
+ # parameters (actually 2099).
+ max_params = self.dialect.insertmanyvalues_max_parameters
+ if max_params:
+ total_num_of_params = len(self.bind_names)
+ num_params_per_batch = len(imv.insert_crud_params)
+ num_params_outside_of_batch = (
+ total_num_of_params - num_params_per_batch
+ )
+ batch_size = min(
+ batch_size,
+ (
+ (max_params - num_params_outside_of_batch)
+ // num_params_per_batch
+ ),
+ )
+
+ batches = list(parameters)
+
+ processed_setinputsizes = None
+ batchnum = 1
+ total_batches = lenparams // batch_size + (
+ 1 if lenparams % batch_size else 0
+ )
+
+ insert_crud_params = imv.insert_crud_params
+ assert insert_crud_params is not None
+
+ escaped_bind_names: Mapping[str, str]
+ if not self.positional:
+ if self.escaped_bind_names:
+ escaped_bind_names = self.escaped_bind_names
+ else:
+ escaped_bind_names = {}
+
+ all_keys = set(parameters[0])
+
+ escaped_insert_crud_params: Sequence[Any] = [
+ (escaped_bind_names.get(col.key, col.key), formatted)
+ for col, _, formatted in insert_crud_params
+ ]
+
+ keys_to_replace = all_keys.intersection(
+ key for key, _ in escaped_insert_crud_params
+ )
+ base_parameters = {
+ key: parameters[0][key]
+ for key in all_keys.difference(keys_to_replace)
+ }
+ executemany_values_w_comma = ""
+ else:
+ escaped_insert_crud_params = ()
+ keys_to_replace = set()
+ base_parameters = {}
+ executemany_values_w_comma = f"({imv.single_values_expr}), "
+
+ while batches:
+ batch = batches[0:batch_size]
+ batches[0:batch_size] = []
+
+ if generic_setinputsizes:
+ # if setinputsizes is present, expand this collection to
+ # suit the batch length as well
+ # currently this will be mssql+pyodbc for internal dialects
+ processed_setinputsizes = [
+ (new_key, len_, typ)
+ for new_key, len_, typ in (
+ (f"{key}_{index}", len_, typ)
+ for index in range(len(batch))
+ for key, len_, typ in generic_setinputsizes
+ )
+ ]
+
+ replaced_parameters: Any
+ if self.positional:
+ # the assumption here is that any parameters that are not
+ # in the VALUES clause are expected to be parameterized
+ # expressions in the RETURNING (or maybe ON CONFLICT) clause.
+ # So based on
+ # which sequence comes first in the compiler's INSERT
+ # statement tells us where to expand the parameters.
+
+ # otherwise we probably shouldn't be doing insertmanyvalues
+ # on the statement.
+
+ num_ins_params = imv.num_positional_params_counted
+
+ if num_ins_params == len(batch[0]):
+ extra_params = ()
+ batch_iterator: Iterable[Tuple[Any, ...]] = batch
+ elif self.returning_precedes_values:
+ extra_params = batch[0][:-num_ins_params]
+ batch_iterator = (b[-num_ins_params:] for b in batch)
+ else:
+ extra_params = batch[0][num_ins_params:]
+ batch_iterator = (b[:num_ins_params] for b in batch)
+
+ replaced_statement = statement.replace(
+ "__EXECMANY_TOKEN__",
+ (executemany_values_w_comma * len(batch))[:-2],
+ )
+
+ replaced_parameters = tuple(
+ itertools.chain.from_iterable(batch_iterator)
+ )
+ if self.returning_precedes_values:
+ replaced_parameters = extra_params + replaced_parameters
+ else:
+ replaced_parameters = replaced_parameters + extra_params
+ else:
+ replaced_values_clauses = []
+ replaced_parameters = base_parameters.copy()
+
+ for i, param in enumerate(batch):
+ new_tokens = [
+ formatted.replace(key, f"{key}__{i}")
+ if key in param
+ else formatted
+ for key, formatted in escaped_insert_crud_params
+ ]
+ replaced_values_clauses.append(
+ f"({', '.join(new_tokens)})"
+ )
+
+ replaced_parameters.update(
+ {f"{key}__{i}": param[key] for key in keys_to_replace}
+ )
+
+ replaced_statement = statement.replace(
+ "__EXECMANY_TOKEN__",
+ ", ".join(replaced_values_clauses),
+ )
+
+ yield (
+ replaced_statement,
+ replaced_parameters,
+ processed_setinputsizes,
+ batchnum,
+ total_batches,
+ )
+ batchnum += 1
+
def visit_insert(self, insert_stmt, **kw):
+
compile_state = insert_stmt._compile_state_factory(
insert_stmt, self, **kw
)
@@ -4529,9 +4775,24 @@ class SQLCompiler(Compiled):
}
)
+ positiontup_before = positiontup_after = 0
+
+ # for positional, insertmanyvalues needs to know how many
+ # bound parameters are in the VALUES sequence; there's no simple
+ # rule because default expressions etc. can have zero or more
+ # params inside them. After multiple attempts to figure this out,
+ # this very simplistic "count before, then count after" works and is
+ # likely the least amount of callcounts, though looks clumsy
+ if self.positiontup:
+ positiontup_before = len(self.positiontup)
+
crud_params_struct = crud._get_crud_params(
self, insert_stmt, compile_state, toplevel, **kw
)
+
+ if self.positiontup:
+ positiontup_after = len(self.positiontup)
+
crud_params_single = crud_params_struct.single_params
if (
@@ -4584,14 +4845,34 @@ class SQLCompiler(Compiled):
)
if self.implicit_returning or insert_stmt._returning:
+
+ # if returning clause is rendered first, capture bound parameters
+ # while visiting and place them prior to the VALUES oriented
+ # bound parameters, when using positional parameter scheme
+ rpv = self.returning_precedes_values
+ flip_pt = rpv and self.positional
+ if flip_pt:
+ pt: Optional[List[str]] = self.positiontup
+ temp_pt: Optional[List[str]]
+ self.positiontup = temp_pt = []
+ else:
+ temp_pt = pt = None
+
returning_clause = self.returning_clause(
insert_stmt,
self.implicit_returning or insert_stmt._returning,
populate_result_map=toplevel,
)
- if self.returning_precedes_values:
+ if flip_pt:
+ if TYPE_CHECKING:
+ assert temp_pt is not None
+ assert pt is not None
+ self.positiontup = temp_pt + pt
+
+ if rpv:
text += " " + returning_clause
+
else:
returning_clause = None
@@ -4614,6 +4895,18 @@ class SQLCompiler(Compiled):
text += " %s" % select_text
elif not crud_params_single and supports_default_values:
text += " DEFAULT VALUES"
+ if toplevel and self._insert_stmt_should_use_insertmanyvalues(
+ insert_stmt
+ ):
+ self._insertmanyvalues = _InsertManyValues(
+ True,
+ self.dialect.default_metavalue_token,
+ cast(
+ "List[Tuple[KeyedColumnElement[Any], str, str]]",
+ crud_params_single,
+ ),
+ (positiontup_after - positiontup_before),
+ )
elif compile_state._has_multi_parameters:
text += " VALUES %s" % (
", ".join(
@@ -4623,6 +4916,8 @@ class SQLCompiler(Compiled):
)
)
else:
+ # TODO: why is third element of crud_params_single not str
+ # already?
insert_single_values_expr = ", ".join(
[
value
@@ -4631,9 +4926,20 @@ class SQLCompiler(Compiled):
)
]
)
+
text += " VALUES (%s)" % insert_single_values_expr
- if toplevel:
- self.insert_single_values_expr = insert_single_values_expr
+ if toplevel and self._insert_stmt_should_use_insertmanyvalues(
+ insert_stmt
+ ):
+ self._insertmanyvalues = _InsertManyValues(
+ False,
+ insert_single_values_expr,
+ cast(
+ "List[Tuple[KeyedColumnElement[Any], str, str]]",
+ crud_params_single,
+ ),
+ positiontup_after - positiontup_before,
+ )
if insert_stmt._post_values_clause is not None:
post_values_clause = self.process(
diff --git a/lib/sqlalchemy/sql/crud.py b/lib/sqlalchemy/sql/crud.py
index 81151a26b..b13377a59 100644
--- a/lib/sqlalchemy/sql/crud.py
+++ b/lib/sqlalchemy/sql/crud.py
@@ -32,6 +32,7 @@ from . import coercions
from . import dml
from . import elements
from . import roles
+from .dml import isinsert as _compile_state_isinsert
from .elements import ColumnClause
from .schema import default_is_clause_element
from .schema import default_is_sequence
@@ -73,19 +74,18 @@ def _as_dml_column(c: ColumnElement[Any]) -> ColumnClause[Any]:
return c
-class _CrudParams(NamedTuple):
- single_params: Sequence[
- Tuple[ColumnElement[Any], str, Optional[Union[str, _SQLExprDefault]]]
- ]
- all_multi_params: List[
- Sequence[
- Tuple[
- ColumnClause[Any],
- str,
- str,
- ]
- ]
+_CrudParamSequence = Sequence[
+ Tuple[
+ "ColumnElement[Any]",
+ str,
+ Optional[Union[str, "_SQLExprDefault"]],
]
+]
+
+
+class _CrudParams(NamedTuple):
+ single_params: _CrudParamSequence
+ all_multi_params: List[Sequence[Tuple[ColumnClause[Any], str, str]]]
def _get_crud_params(
@@ -144,6 +144,12 @@ def _get_crud_params(
compiler._get_bind_name_for_col = _col_bind_name
+ if stmt._returning and stmt._return_defaults:
+ raise exc.CompileError(
+ "Can't compile statement that includes returning() and "
+ "return_defaults() simultaneously"
+ )
+
# no parameters in the statement, no parameters in the
# compiled params - return binds for all columns
if compiler.column_keys is None and compile_state._no_parameters:
@@ -164,7 +170,10 @@ def _get_crud_params(
]
spd: Optional[MutableMapping[_DMLColumnElement, Any]]
- if compile_state._has_multi_parameters:
+ if (
+ _compile_state_isinsert(compile_state)
+ and compile_state._has_multi_parameters
+ ):
mp = compile_state._multi_parameters
assert mp is not None
spd = mp[0]
@@ -227,7 +236,7 @@ def _get_crud_params(
kw,
)
- if compile_state.isinsert and stmt._select_names:
+ if _compile_state_isinsert(compile_state) and stmt._select_names:
# is an insert from select, is not a multiparams
assert not compile_state._has_multi_parameters
@@ -272,7 +281,10 @@ def _get_crud_params(
% (", ".join("%s" % (c,) for c in check))
)
- if compile_state._has_multi_parameters:
+ if (
+ _compile_state_isinsert(compile_state)
+ and compile_state._has_multi_parameters
+ ):
# is a multiparams, is not an insert from a select
assert not stmt._select_names
multi_extended_values = _extend_values_for_multiparams(
@@ -297,7 +309,7 @@ def _get_crud_params(
(
_as_dml_column(stmt.table.columns[0]),
compiler.preparer.format_column(stmt.table.columns[0]),
- "DEFAULT",
+ compiler.dialect.default_metavalue_token,
)
]
@@ -500,7 +512,7 @@ def _scan_insert_from_select_cols(
ins_from_select = ins_from_select._generate()
# copy raw_columns
ins_from_select._raw_columns = list(ins_from_select._raw_columns) + [
- expr for col, col_expr, expr in add_select_cols
+ expr for _, _, expr in add_select_cols
]
compiler.stack[-1]["insert_from_select"] = ins_from_select
@@ -539,7 +551,8 @@ def _scan_cols(
else:
cols = stmt.table.columns
- if compile_state.isinsert and not compile_state._has_multi_parameters:
+ isinsert = _compile_state_isinsert(compile_state)
+ if isinsert and not compile_state._has_multi_parameters:
# new rules for #7998. fetch lastrowid or implicit returning
# for autoincrement column even if parameter is NULL, for DBs that
# override NULL param for primary key (sqlite, mysql/mariadb)
@@ -575,7 +588,7 @@ def _scan_cols(
kw,
)
- elif compile_state.isinsert:
+ elif isinsert:
# no parameter is present and it's an insert.
if c.primary_key and need_pks:
@@ -683,7 +696,8 @@ def _append_param_parameter(
value,
required=value is REQUIRED,
name=_col_bind_name(c)
- if not compile_state._has_multi_parameters
+ if not _compile_state_isinsert(compile_state)
+ or not compile_state._has_multi_parameters
else "%s_m0" % _col_bind_name(c),
**kw,
)
@@ -706,7 +720,8 @@ def _append_param_parameter(
c,
value,
name=_col_bind_name(c)
- if not compile_state._has_multi_parameters
+ if not _compile_state_isinsert(compile_state)
+ or not compile_state._has_multi_parameters
else "%s_m0" % _col_bind_name(c),
**kw,
)
@@ -922,11 +937,19 @@ def _append_param_insert_select_hasdefault(
not c.default.optional or not compiler.dialect.sequences_optional
):
values.append(
- (c, compiler.preparer.format_column(c), c.default.next_value())
+ (
+ c,
+ compiler.preparer.format_column(c),
+ c.default.next_value(),
+ )
)
elif default_is_clause_element(c.default):
values.append(
- (c, compiler.preparer.format_column(c), c.default.arg.self_group())
+ (
+ c,
+ compiler.preparer.format_column(c),
+ c.default.arg.self_group(),
+ )
)
else:
values.append(
@@ -1105,14 +1128,10 @@ def _process_multiparam_default_bind(
return compiler.process(c.default, **kw)
else:
col = _multiparam_column(c, index)
- if isinstance(stmt, dml.Insert):
- return _create_insert_prefetch_bind_param(
- compiler, col, process=True, **kw
- )
- else:
- return _create_update_prefetch_bind_param(
- compiler, col, process=True, **kw
- )
+ assert isinstance(stmt, dml.Insert)
+ return _create_insert_prefetch_bind_param(
+ compiler, col, process=True, **kw
+ )
def _get_update_multitable_params(
@@ -1205,13 +1224,7 @@ def _extend_values_for_multiparams(
mp = compile_state._multi_parameters
assert mp is not None
for i, row in enumerate(mp[1:]):
- extension: List[
- Tuple[
- ColumnClause[Any],
- str,
- str,
- ]
- ] = []
+ extension: List[Tuple[ColumnClause[Any], str, str]] = []
row = {_column_as_key(key): v for key, v in row.items()}
@@ -1292,7 +1305,7 @@ def _get_returning_modifiers(compiler, stmt, compile_state, toplevel):
"""
need_pks = (
toplevel
- and compile_state.isinsert
+ and _compile_state_isinsert(compile_state)
and not stmt._inline
and (
not compiler.for_executemany
@@ -1348,7 +1361,7 @@ def _get_returning_modifiers(compiler, stmt, compile_state, toplevel):
if implicit_returning:
postfetch_lastrowid = False
- if compile_state.isinsert:
+ if _compile_state_isinsert(compile_state):
implicit_return_defaults = implicit_returning and stmt._return_defaults
elif compile_state.isupdate:
implicit_return_defaults = (
diff --git a/lib/sqlalchemy/sql/dml.py b/lib/sqlalchemy/sql/dml.py
index eb612f394..a08e38800 100644
--- a/lib/sqlalchemy/sql/dml.py
+++ b/lib/sqlalchemy/sql/dml.py
@@ -118,7 +118,6 @@ class DMLState(CompileState):
] = None
_ordered_values: Optional[List[Tuple[_DMLColumnElement, Any]]] = None
_parameter_ordering: Optional[List[_DMLColumnElement]] = None
- _has_multi_parameters = False
_primary_table: FromClause
_supports_implicit_returning = True
@@ -202,64 +201,10 @@ class DMLState(CompileState):
froms.extend(all_tables[1:])
return primary_table, froms
- def _process_multi_values(self, statement: ValuesBase) -> None:
- if not statement._supports_multi_parameters:
- raise exc.InvalidRequestError(
- "%s construct does not support "
- "multiple parameter sets." % statement.__visit_name__.upper()
- )
- else:
- assert isinstance(statement, Insert)
-
- # which implies...
- # assert isinstance(statement.table, TableClause)
-
- for parameters in statement._multi_values:
- multi_parameters: List[MutableMapping[_DMLColumnElement, Any]] = [
- {
- c.key: value
- for c, value in zip(statement.table.c, parameter_set)
- }
- if isinstance(parameter_set, collections_abc.Sequence)
- else parameter_set
- for parameter_set in parameters
- ]
-
- if self._no_parameters:
- self._no_parameters = False
- self._has_multi_parameters = True
- self._multi_parameters = multi_parameters
- self._dict_parameters = self._multi_parameters[0]
- elif not self._has_multi_parameters:
- self._cant_mix_formats_error()
- else:
- assert self._multi_parameters
- self._multi_parameters.extend(multi_parameters)
-
def _process_values(self, statement: ValuesBase) -> None:
if self._no_parameters:
- self._has_multi_parameters = False
self._dict_parameters = statement._values
self._no_parameters = False
- elif self._has_multi_parameters:
- self._cant_mix_formats_error()
-
- def _process_ordered_values(self, statement: ValuesBase) -> None:
- parameters = statement._ordered_values
-
- if self._no_parameters:
- self._no_parameters = False
- assert parameters is not None
- self._dict_parameters = dict(parameters)
- self._ordered_values = parameters
- self._parameter_ordering = [key for key, value in parameters]
- elif self._has_multi_parameters:
- self._cant_mix_formats_error()
- else:
- raise exc.InvalidRequestError(
- "Can only invoke ordered_values() once, and not mixed "
- "with any other values() call"
- )
def _process_select_values(self, statement: ValuesBase) -> None:
assert statement._select_names is not None
@@ -276,6 +221,12 @@ class DMLState(CompileState):
# does not allow this construction to occur
assert False, "This statement already has parameters"
+ def _no_multi_values_supported(self, statement: ValuesBase) -> NoReturn:
+ raise exc.InvalidRequestError(
+ "%s construct does not support "
+ "multiple parameter sets." % statement.__visit_name__.upper()
+ )
+
def _cant_mix_formats_error(self) -> NoReturn:
raise exc.InvalidRequestError(
"Can't mix single and multiple VALUES "
@@ -291,6 +242,8 @@ class InsertDMLState(DMLState):
include_table_with_column_exprs = False
+ _has_multi_parameters = False
+
def __init__(
self,
statement: Insert,
@@ -320,6 +273,37 @@ class InsertDMLState(DMLState):
for col in self._dict_parameters or ()
]
+ def _process_values(self, statement: ValuesBase) -> None:
+ if self._no_parameters:
+ self._has_multi_parameters = False
+ self._dict_parameters = statement._values
+ self._no_parameters = False
+ elif self._has_multi_parameters:
+ self._cant_mix_formats_error()
+
+ def _process_multi_values(self, statement: ValuesBase) -> None:
+ for parameters in statement._multi_values:
+ multi_parameters: List[MutableMapping[_DMLColumnElement, Any]] = [
+ {
+ c.key: value
+ for c, value in zip(statement.table.c, parameter_set)
+ }
+ if isinstance(parameter_set, collections_abc.Sequence)
+ else parameter_set
+ for parameter_set in parameters
+ ]
+
+ if self._no_parameters:
+ self._no_parameters = False
+ self._has_multi_parameters = True
+ self._multi_parameters = multi_parameters
+ self._dict_parameters = self._multi_parameters[0]
+ elif not self._has_multi_parameters:
+ self._cant_mix_formats_error()
+ else:
+ assert self._multi_parameters
+ self._multi_parameters.extend(multi_parameters)
+
@CompileState.plugin_for("default", "update")
class UpdateDMLState(DMLState):
@@ -336,7 +320,7 @@ class UpdateDMLState(DMLState):
elif statement._values is not None:
self._process_values(statement)
elif statement._multi_values:
- self._process_multi_values(statement)
+ self._no_multi_values_supported(statement)
t, ef = self._make_extra_froms(statement)
self._primary_table = t
self._extra_froms = ef
@@ -347,6 +331,21 @@ class UpdateDMLState(DMLState):
mt and compiler.render_table_with_column_in_update_from
)
+ def _process_ordered_values(self, statement: ValuesBase) -> None:
+ parameters = statement._ordered_values
+
+ if self._no_parameters:
+ self._no_parameters = False
+ assert parameters is not None
+ self._dict_parameters = dict(parameters)
+ self._ordered_values = parameters
+ self._parameter_ordering = [key for key, value in parameters]
+ else:
+ raise exc.InvalidRequestError(
+ "Can only invoke ordered_values() once, and not mixed "
+ "with any other values() call"
+ )
+
@CompileState.plugin_for("default", "delete")
class DeleteDMLState(DMLState):
@@ -897,18 +896,68 @@ class ValuesBase(UpdateBase):
return self
@_generative
- @_exclusive_against(
- "_returning",
- msgs={
- "_returning": "RETURNING is already configured on this statement"
- },
- defaults={"_returning": _returning},
- )
def return_defaults(
self: SelfValuesBase, *cols: _DMLColumnArgument
) -> SelfValuesBase:
"""Make use of a :term:`RETURNING` clause for the purpose
- of fetching server-side expressions and defaults.
+ of fetching server-side expressions and defaults, for supporting
+ backends only.
+
+ .. tip::
+
+ The :meth:`.ValuesBase.return_defaults` method is used by the ORM
+ for its internal work in fetching newly generated primary key
+ and server default values, in particular to provide the underyling
+ implementation of the :paramref:`_orm.Mapper.eager_defaults`
+ ORM feature. Its behavior is fairly idiosyncratic
+ and is not really intended for general use. End users should
+ stick with using :meth:`.UpdateBase.returning` in order to
+ add RETURNING clauses to their INSERT, UPDATE and DELETE
+ statements.
+
+ Normally, a single row INSERT statement will automatically populate the
+ :attr:`.CursorResult.inserted_primary_key` attribute when executed,
+ which stores the primary key of the row that was just inserted in the
+ form of a :class:`.Row` object with column names as named tuple keys
+ (and the :attr:`.Row._mapping` view fully populated as well). The
+ dialect in use chooses the strategy to use in order to populate this
+ data; if it was generated using server-side defaults and / or SQL
+ expressions, dialect-specific approaches such as ``cursor.lastrowid``
+ or ``RETURNING`` are typically used to acquire the new primary key
+ value.
+
+ However, when the statement is modified by calling
+ :meth:`.ValuesBase.return_defaults` before executing the statement,
+ additional behaviors take place **only** for backends that support
+ RETURNING and for :class:`.Table` objects that maintain the
+ :paramref:`.Table.implicit_returning` parameter at its default value of
+ ``True``. In these cases, when the :class:`.CursorResult` is returned
+ from the statement's execution, not only will
+ :attr:`.CursorResult.inserted_primary_key` be populated as always, the
+ :attr:`.CursorResult.returned_defaults` attribute will also be
+ populated with a :class:`.Row` named-tuple representing the full range
+ of server generated
+ values from that single row, including values for any columns that
+ specify :paramref:`_schema.Column.server_default` or which make use of
+ :paramref:`_schema.Column.default` using a SQL expression.
+
+ When invoking INSERT statements with multiple rows using
+ :ref:`insertmanyvalues <engine_insertmanyvalues>`, the
+ :meth:`.ValuesBase.return_defaults` modifier will have the effect of
+ the :attr:`_engine.CursorResult.inserted_primary_key_rows` and
+ :attr:`_engine.CursorResult.returned_defaults_rows` attributes being
+ fully populated with lists of :class:`.Row` objects representing newly
+ inserted primary key values as well as newly inserted server generated
+ values for each row inserted. The
+ :attr:`.CursorResult.inserted_primary_key` and
+ :attr:`.CursorResult.returned_defaults` attributes will also continue
+ to be populated with the first row of these two collections.
+
+ If the backend does not support RETURNING or the :class:`.Table` in use
+ has disabled :paramref:`.Table.implicit_returning`, then no RETURNING
+ clause is added and no additional data is fetched, however the
+ INSERT or UPDATE statement proceeds normally.
+
E.g.::
@@ -918,64 +967,58 @@ class ValuesBase(UpdateBase):
server_created_at = result.returned_defaults['created_at']
- When used against a backend that supports RETURNING, all column
- values generated by SQL expression or server-side-default will be
- added to any existing RETURNING clause, provided that
- :meth:`.UpdateBase.returning` is not used simultaneously. The column
- values will then be available on the result using the
- :attr:`_engine.CursorResult.returned_defaults` accessor as
- a dictionary,
- referring to values keyed to the :class:`_schema.Column`
- object as well as
- its ``.key``.
-
- This method differs from :meth:`.UpdateBase.returning` in these ways:
-
- 1. :meth:`.ValuesBase.return_defaults` is only intended for use with an
- INSERT or an UPDATE statement that matches exactly one row per
- parameter set. While the RETURNING construct in the general sense
- supports multiple rows for a multi-row UPDATE or DELETE statement,
- or for special cases of INSERT that return multiple rows (e.g.
- INSERT from SELECT, multi-valued VALUES clause),
- :meth:`.ValuesBase.return_defaults` is intended only for an
- "ORM-style" single-row INSERT/UPDATE statement. The row
- returned by the statement is also consumed implicitly when
- :meth:`.ValuesBase.return_defaults` is used. By contrast,
- :meth:`.UpdateBase.returning` leaves the RETURNING result-set intact
- with a collection of any number of rows.
-
- 2. It is compatible with the existing logic to fetch auto-generated
- primary key values, also known as "implicit returning". Backends
- that support RETURNING will automatically make use of RETURNING in
- order to fetch the value of newly generated primary keys; while the
- :meth:`.UpdateBase.returning` method circumvents this behavior,
- :meth:`.ValuesBase.return_defaults` leaves it intact.
-
- 3. It can be called against any backend. Backends that don't support
- RETURNING will skip the usage of the feature, rather than raising
- an exception. The return value of
- :attr:`_engine.CursorResult.returned_defaults` will be ``None``
+
+ The :meth:`.ValuesBase.return_defaults` method is mutually exclusive
+ against the :meth:`.UpdateBase.returning` method and errors will be
+ raised during the SQL compilation process if both are used at the same
+ time on one statement. The RETURNING clause of the INSERT or UPDATE
+ statement is therefore controlled by only one of these methods at a
+ time.
+
+ The :meth:`.ValuesBase.return_defaults` method differs from
+ :meth:`.UpdateBase.returning` in these ways:
+
+ 1. :meth:`.ValuesBase.return_defaults` method causes the
+ :attr:`.CursorResult.returned_defaults` collection to be populated
+ with the first row from the RETURNING result. This attribute is not
+ populated when using :meth:`.UpdateBase.returning`.
+
+ 2. :meth:`.ValuesBase.return_defaults` is compatible with existing
+ logic used to fetch auto-generated primary key values that are then
+ populated into the :attr:`.CursorResult.inserted_primary_key`
+ attribute. By contrast, using :meth:`.UpdateBase.returning` will
+ have the effect of the :attr:`.CursorResult.inserted_primary_key`
+ attribute being left unpopulated.
+
+ 3. :meth:`.ValuesBase.return_defaults` can be called against any
+ backend. Backends that don't support RETURNING will skip the usage
+ of the feature, rather than raising an exception. The return value
+ of :attr:`_engine.CursorResult.returned_defaults` will be ``None``
+ for backends that don't support RETURNING or for which the target
+ :class:`.Table` sets :paramref:`.Table.implicit_returning` to
+ ``False``.
4. An INSERT statement invoked with executemany() is supported if the
backend database driver supports the
- ``insert_executemany_returning`` feature, currently this includes
- PostgreSQL with psycopg2. When executemany is used, the
+ :ref:`insertmanyvalues <engine_insertmanyvalues>`
+ feature which is now supported by most SQLAlchemy-included backends.
+ When executemany is used, the
:attr:`_engine.CursorResult.returned_defaults_rows` and
:attr:`_engine.CursorResult.inserted_primary_key_rows` accessors
will return the inserted defaults and primary keys.
- .. versionadded:: 1.4
+ .. versionadded:: 1.4 Added
+ :attr:`_engine.CursorResult.returned_defaults_rows` and
+ :attr:`_engine.CursorResult.inserted_primary_key_rows` accessors.
+ In version 2.0, the underlying implementation which fetches and
+ populates the data for these attributes was generalized to be
+ supported by most backends, whereas in 1.4 they were only
+ supported by the ``psycopg2`` driver.
- :meth:`.ValuesBase.return_defaults` is used by the ORM to provide
- an efficient implementation for the ``eager_defaults`` feature of
- :class:`_orm.Mapper`.
:param cols: optional list of column key names or
- :class:`_schema.Column`
- objects. If omitted, all column expressions evaluated on the server
- are added to the returning list.
-
- .. versionadded:: 0.9.0
+ :class:`_schema.Column` that acts as a filter for those columns that
+ will be fetched.
.. seealso::
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index 3320214a2..565537109 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -647,8 +647,9 @@ class Table(
server side defaults, on those backends which support RETURNING.
In modern SQLAlchemy there is generally no reason to alter this
- setting, except in the case of some backends such as SQL Server
- when INSERT triggers are used for that table.
+ setting, except for some backend specific cases
+ (see :ref:`mssql_triggers` in the SQL Server dialect documentation
+ for one such example).
:param include_columns: A list of strings indicating a subset of
columns to be loaded via the ``autoload`` operation; table columns who
@@ -3158,7 +3159,6 @@ class ScalarElementColumnDefault(ColumnDefault):
)
-# _SQLExprDefault = Union["ColumnElement[Any]", "TextClause", "SelectBase"]
_SQLExprDefault = Union["ColumnElement[Any]", "TextClause"]
diff --git a/lib/sqlalchemy/sql/util.py b/lib/sqlalchemy/sql/util.py
index 0400ab3fe..55c6a35f8 100644
--- a/lib/sqlalchemy/sql/util.py
+++ b/lib/sqlalchemy/sql/util.py
@@ -598,6 +598,21 @@ class _repr_row(_repr_base):
)
+class _long_statement(str):
+ def __str__(self) -> str:
+ lself = len(self)
+ if lself > 500:
+ lleft = 250
+ lright = 100
+ trunc = lself - lleft - lright
+ return (
+ f"{self[0:lleft]} ... {trunc} "
+ f"characters truncated ... {self[-lright:]}"
+ )
+ else:
+ return str.__str__(self)
+
+
class _repr_params(_repr_base):
"""Provide a string view of bound parameters.
@@ -606,12 +621,13 @@ class _repr_params(_repr_base):
"""
- __slots__ = "params", "batches", "ismulti"
+ __slots__ = "params", "batches", "ismulti", "max_params"
def __init__(
self,
params: Optional[_AnyExecuteParams],
batches: int,
+ max_params: int = 100,
max_chars: int = 300,
ismulti: Optional[bool] = None,
):
@@ -619,6 +635,7 @@ class _repr_params(_repr_base):
self.ismulti = ismulti
self.batches = batches
self.max_chars = max_chars
+ self.max_params = max_params
def __repr__(self) -> str:
if self.ismulti is None:
@@ -693,29 +710,110 @@ class _repr_params(_repr_base):
else:
return "(%s)" % elements
+ def _get_batches(self, params: Iterable[Any]) -> Any:
+
+ lparams = list(params)
+ lenparams = len(lparams)
+ if lenparams > self.max_params:
+ lleft = self.max_params // 2
+ return (
+ lparams[0:lleft],
+ lparams[-lleft:],
+ lenparams - self.max_params,
+ )
+ else:
+ return lparams, None, None
+
def _repr_params(
self,
params: _AnySingleExecuteParams,
typ: int,
) -> str:
- trunc = self.trunc
if typ is self._DICT:
- return "{%s}" % (
+ return self._repr_param_dict(
+ cast("_CoreSingleExecuteParams", params)
+ )
+ elif typ is self._TUPLE:
+ return self._repr_param_tuple(cast("Sequence[Any]", params))
+ else:
+ return self._repr_param_list(params)
+
+ def _repr_param_dict(self, params: _CoreSingleExecuteParams) -> str:
+ trunc = self.trunc
+ (
+ items_first_batch,
+ items_second_batch,
+ trunclen,
+ ) = self._get_batches(params.items())
+
+ if items_second_batch:
+ text = "{%s" % (
", ".join(
- "%r: %s" % (key, trunc(value))
- for key, value in cast(
- "_CoreSingleExecuteParams", params
- ).items()
+ f"{key!r}: {trunc(value)}"
+ for key, value in items_first_batch
)
)
- elif typ is self._TUPLE:
- seq_params = cast("Sequence[Any]", params)
- return "(%s%s)" % (
- ", ".join(trunc(value) for value in seq_params),
- "," if len(seq_params) == 1 else "",
+ text += f" ... {trunclen} parameters truncated ... "
+ text += "%s}" % (
+ ", ".join(
+ f"{key!r}: {trunc(value)}"
+ for key, value in items_second_batch
+ )
)
else:
- return "[%s]" % (", ".join(trunc(value) for value in params))
+ text = "{%s}" % (
+ ", ".join(
+ f"{key!r}: {trunc(value)}"
+ for key, value in items_first_batch
+ )
+ )
+ return text
+
+ def _repr_param_tuple(self, params: "Sequence[Any]") -> str:
+ trunc = self.trunc
+
+ (
+ items_first_batch,
+ items_second_batch,
+ trunclen,
+ ) = self._get_batches(params)
+
+ if items_second_batch:
+ text = "(%s" % (
+ ", ".join(trunc(value) for value in items_first_batch)
+ )
+ text += f" ... {trunclen} parameters truncated ... "
+ text += "%s)" % (
+ ", ".join(trunc(value) for value in items_second_batch),
+ )
+ else:
+ text = "(%s%s)" % (
+ ", ".join(trunc(value) for value in items_first_batch),
+ "," if len(items_first_batch) == 1 else "",
+ )
+ return text
+
+ def _repr_param_list(self, params: _AnySingleExecuteParams) -> str:
+ trunc = self.trunc
+ (
+ items_first_batch,
+ items_second_batch,
+ trunclen,
+ ) = self._get_batches(params)
+
+ if items_second_batch:
+ text = "[%s" % (
+ ", ".join(trunc(value) for value in items_first_batch)
+ )
+ text += f" ... {trunclen} parameters truncated ... "
+ text += "%s]" % (
+ ", ".join(trunc(value) for value in items_second_batch)
+ )
+ else:
+ text = "[%s]" % (
+ ", ".join(trunc(value) for value in items_first_batch)
+ )
+ return text
def adapt_criterion_to_null(crit: _CE, nulls: Collection[Any]) -> _CE:
diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py
index 7ba89b505..a8650f222 100644
--- a/lib/sqlalchemy/testing/provision.py
+++ b/lib/sqlalchemy/testing/provision.py
@@ -459,3 +459,19 @@ def set_default_schema_on_connection(cfg, dbapi_connection, schema_name):
"backend does not implement a schema name set function: %s"
% (cfg.db.url,)
)
+
+
+@register.init
+def upsert(cfg, table, returning, set_lambda=None):
+ """return the backends insert..on conflict / on dupe etc. construct.
+
+ while we should add a backend-neutral upsert construct as well, such as
+ insert().upsert(), it's important that we continue to test the
+ backend-specific insert() constructs since if we do implement
+ insert().upsert(), that would be using a different codepath for the things
+ we need to test like insertmanyvalues, etc.
+
+ """
+ raise NotImplementedError(
+ f"backend does not include an upsert implementation: {cfg.db.url}"
+ )
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index 874383394..3a0fc818d 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -424,6 +424,15 @@ class SuiteRequirements(Requirements):
)
@property
+ def insertmanyvalues(self):
+ return exclusions.only_if(
+ lambda config: config.db.dialect.supports_multivalues_insert
+ and config.db.dialect.insert_returning
+ and config.db.dialect.use_insertmanyvalues,
+ "%(database)s %(does_support)s 'insertmanyvalues functionality",
+ )
+
+ @property
def tuple_in(self):
"""Target platform supports the syntax
"(x, y) IN ((x1, y1), (x2, y2), ...)"
diff --git a/lib/sqlalchemy/testing/suite/test_dialect.py b/lib/sqlalchemy/testing/suite/test_dialect.py
index bb2dd6574..efad81930 100644
--- a/lib/sqlalchemy/testing/suite/test_dialect.py
+++ b/lib/sqlalchemy/testing/suite/test_dialect.py
@@ -11,6 +11,7 @@ from .. import fixtures
from .. import is_true
from .. import ne_
from .. import provide_metadata
+from ..assertions import expect_raises
from ..assertions import expect_raises_message
from ..config import requirements
from ..provision import set_default_schema_on_connection
@@ -412,3 +413,156 @@ class DifficultParametersTest(fixtures.TestBase):
# name works as the key from cursor.description
eq_(row._mapping[name], "some name")
+
+
+class ReturningGuardsTest(fixtures.TablesTest):
+ """test that the various 'returning' flags are set appropriately"""
+
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+
+ Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ )
+
+ @testing.fixture
+ def run_stmt(self, connection):
+ t = self.tables.t
+
+ def go(stmt, executemany, id_param_name, expect_success):
+ stmt = stmt.returning(t.c.id)
+
+ if executemany:
+ if not expect_success:
+ # for RETURNING executemany(), we raise our own
+ # error as this is independent of general RETURNING
+ # support
+ with expect_raises_message(
+ exc.StatementError,
+ rf"Dialect {connection.dialect.name}\+"
+ f"{connection.dialect.driver} with "
+ f"current server capabilities does not support "
+ f".*RETURNING when executemany is used",
+ ):
+ result = connection.execute(
+ stmt,
+ [
+ {id_param_name: 1, "data": "d1"},
+ {id_param_name: 2, "data": "d2"},
+ {id_param_name: 3, "data": "d3"},
+ ],
+ )
+ else:
+ result = connection.execute(
+ stmt,
+ [
+ {id_param_name: 1, "data": "d1"},
+ {id_param_name: 2, "data": "d2"},
+ {id_param_name: 3, "data": "d3"},
+ ],
+ )
+ eq_(result.all(), [(1,), (2,), (3,)])
+ else:
+ if not expect_success:
+ # for RETURNING execute(), we pass all the way to the DB
+ # and let it fail
+ with expect_raises(exc.DBAPIError):
+ connection.execute(
+ stmt, {id_param_name: 1, "data": "d1"}
+ )
+ else:
+ result = connection.execute(
+ stmt, {id_param_name: 1, "data": "d1"}
+ )
+ eq_(result.all(), [(1,)])
+
+ return go
+
+ def test_insert_single(self, connection, run_stmt):
+ t = self.tables.t
+
+ stmt = t.insert()
+
+ run_stmt(stmt, False, "id", connection.dialect.insert_returning)
+
+ def test_insert_many(self, connection, run_stmt):
+ t = self.tables.t
+
+ stmt = t.insert()
+
+ run_stmt(
+ stmt, True, "id", connection.dialect.insert_executemany_returning
+ )
+
+ def test_update_single(self, connection, run_stmt):
+ t = self.tables.t
+
+ connection.execute(
+ t.insert(),
+ [
+ {"id": 1, "data": "d1"},
+ {"id": 2, "data": "d2"},
+ {"id": 3, "data": "d3"},
+ ],
+ )
+
+ stmt = t.update().where(t.c.id == bindparam("b_id"))
+
+ run_stmt(stmt, False, "b_id", connection.dialect.update_returning)
+
+ def test_update_many(self, connection, run_stmt):
+ t = self.tables.t
+
+ connection.execute(
+ t.insert(),
+ [
+ {"id": 1, "data": "d1"},
+ {"id": 2, "data": "d2"},
+ {"id": 3, "data": "d3"},
+ ],
+ )
+
+ stmt = t.update().where(t.c.id == bindparam("b_id"))
+
+ run_stmt(
+ stmt, True, "b_id", connection.dialect.update_executemany_returning
+ )
+
+ def test_delete_single(self, connection, run_stmt):
+ t = self.tables.t
+
+ connection.execute(
+ t.insert(),
+ [
+ {"id": 1, "data": "d1"},
+ {"id": 2, "data": "d2"},
+ {"id": 3, "data": "d3"},
+ ],
+ )
+
+ stmt = t.delete().where(t.c.id == bindparam("b_id"))
+
+ run_stmt(stmt, False, "b_id", connection.dialect.delete_returning)
+
+ def test_delete_many(self, connection, run_stmt):
+ t = self.tables.t
+
+ connection.execute(
+ t.insert(),
+ [
+ {"id": 1, "data": "d1"},
+ {"id": 2, "data": "d2"},
+ {"id": 3, "data": "d3"},
+ ],
+ )
+
+ stmt = t.delete().where(t.c.id == bindparam("b_id"))
+
+ run_stmt(
+ stmt, True, "b_id", connection.dialect.delete_executemany_returning
+ )
diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py
index 2307d3b3f..ae54f6bcd 100644
--- a/lib/sqlalchemy/testing/suite/test_insert.py
+++ b/lib/sqlalchemy/testing/suite/test_insert.py
@@ -338,6 +338,7 @@ class ReturningTest(fixtures.TablesTest):
r = connection.execute(
table.insert().returning(table.c.id), dict(data="some data")
)
+
pk = r.first()[0]
fetched_pk = connection.scalar(select(table.c.id))
eq_(fetched_pk, pk)
@@ -357,5 +358,25 @@ class ReturningTest(fixtures.TablesTest):
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
eq_(r.inserted_primary_key, (pk,))
+ @requirements.insert_executemany_returning
+ def test_insertmanyvalues_returning(self, connection):
+ r = connection.execute(
+ self.tables.autoinc_pk.insert().returning(
+ self.tables.autoinc_pk.c.id
+ ),
+ [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ {"data": "d4"},
+ {"data": "d5"},
+ ],
+ )
+ rall = r.all()
+
+ pks = connection.execute(select(self.tables.autoinc_pk.c.id))
+
+ eq_(rall, pks.all())
+
__all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py
index 59e9cc7f4..7d79c67ae 100644
--- a/lib/sqlalchemy/testing/suite/test_results.py
+++ b/lib/sqlalchemy/testing/suite/test_results.py
@@ -164,6 +164,26 @@ class PercentSchemaNamesTest(fixtures.TablesTest):
)
self._assert_table(connection)
+ @requirements.insert_executemany_returning
+ def test_executemany_returning_roundtrip(self, connection):
+ percent_table = self.tables.percent_table
+ connection.execute(
+ percent_table.insert(), {"percent%": 5, "spaces % more spaces": 12}
+ )
+ result = connection.execute(
+ percent_table.insert().returning(
+ percent_table.c["percent%"],
+ percent_table.c["spaces % more spaces"],
+ ),
+ [
+ {"percent%": 7, "spaces % more spaces": 11},
+ {"percent%": 9, "spaces % more spaces": 10},
+ {"percent%": 11, "spaces % more spaces": 9},
+ ],
+ )
+ eq_(result.all(), [(7, 11), (9, 10), (11, 9)])
+ self._assert_table(connection)
+
def _assert_table(self, conn):
percent_table = self.tables.percent_table
lightweight_percent_table = self.tables.lightweight_percent_table