diff options
| -rw-r--r-- | doc/build/changelog/migration_12.rst | 27 | ||||
| -rw-r--r-- | doc/build/faq/performance.rst | 6 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 46 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 83 |
4 files changed, 160 insertions, 2 deletions
diff --git a/doc/build/changelog/migration_12.rst b/doc/build/changelog/migration_12.rst index 4ded828c4..598435ae8 100644 --- a/doc/build/changelog/migration_12.rst +++ b/doc/build/changelog/migration_12.rst @@ -1351,6 +1351,33 @@ is already applied. Dialect Improvements and Changes - PostgreSQL ============================================= +.. _change_4109: + +Support for Batch Mode / Fast Execution Helpers +------------------------------------------------ + +The psycopg2 ``cursor.executemany()`` method has been identified as performing +poorly, particularly with INSERT statements. To alleviate this, psycopg2 +has added `Fast Execution Helpers <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_ +which rework statements such as INSERT statements into fewer SQL calls with +multiple VALUES clauses. SQLAlchemy 1.2 now includes support for these +helpers to be used transparently whenever the :class:`.Engine` makes use +of ``cursor.executemany()`` to invoke a statement against multiple parameter +sets. The feature is off by default and can be enabled using the +``use_batch_mode`` argument on :func:`.create_engine`:: + + engine = create_engine( + "postgresql+psycopg2://scott:tiger@host/dbname", + use_batch_mode=True) + +The feature is considered to be experimental for the moment but may become +on by default in a future release. + +.. seealso:: + + :ref:`psycopg2_batch_mode` + + .. _change_3959: Support for fields specification in INTERVAL, including full reflection diff --git a/doc/build/faq/performance.rst b/doc/build/faq/performance.rst index d6ee557ea..fa298bdad 100644 --- a/doc/build/faq/performance.rst +++ b/doc/build/faq/performance.rst @@ -298,6 +298,12 @@ SQL generation and execution system that the ORM builds on top of is part of the :doc:`Core <core/tutorial>`. Using this system directly, we can produce an INSERT that is competitive with using the raw database API directly. +.. note:: + + When using the psycopg2 dialect, consider making use of the + :ref:`batch execution helpers <psycopg2_batch_mode>` feature of psycopg2, + now supported directly by the SQLAlchemy psycopg2 dialect. + Alternatively, the SQLAlchemy ORM offers the :ref:`bulk_operations` suite of methods, which provide hooks into subsections of the unit of work process in order to emit Core-level INSERT and UPDATE constructs with diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 3e2968d91..8ac39c201 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -53,6 +53,15 @@ psycopg2-specific keyword arguments which are accepted by :ref:`psycopg2_unicode` +* ``use_batch_mode``: This flag allows ``psycopg2.extras.execute_batch`` + for ``cursor.executemany()`` calls performed by the :class:`.Engine`. + It is currently experimental but + may well become True by default as it is critical for executemany performance. + + .. seealso:: + + :ref:`psycopg2_batch_mode` + Unix Domain Connections ------------------------ @@ -101,6 +110,31 @@ The following DBAPI-specific options are respected when used with .. versionadded:: 1.0.6 +.. _psycopg2_batch_mode: + +Psycopg2 Batch Mode (Fast Execution) +------------------------------------ + +Modern versions of psycopg2 include a feature known as +`Fast Execution Helpers <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, +which have been shown in benchmarking to improve psycopg2's executemany() +performance with INSERTS by multiple orders of magnitude. SQLAlchemy +allows this extension to be used for all ``executemany()`` style calls +invoked by an :class:`.Engine` when used with multiple parameter sets, +by adding the ``use_batch_mode`` flag to :func:`.create_engine`:: + + engine = create_engine( + "postgresql+psycopg2://scott:tiger@host/dbname", + use_batch_mode=True) + +Batch mode is considered to be **experimental** at this time, however may +be enabled by default in a future release. + + +.. versionadded:: 1.2.0 + + + .. _psycopg2_unicode: Unicode with Psycopg2 @@ -510,6 +544,7 @@ class PGDialect_psycopg2(PGDialect): def __init__(self, server_side_cursors=False, use_native_unicode=True, client_encoding=None, use_native_hstore=True, use_native_uuid=True, + use_batch_mode=False, **kwargs): PGDialect.__init__(self, **kwargs) self.server_side_cursors = server_side_cursors @@ -518,6 +553,7 @@ class PGDialect_psycopg2(PGDialect): self.use_native_uuid = use_native_uuid self.supports_unicode_binds = use_native_unicode self.client_encoding = client_encoding + self.psycopg2_batch_mode = use_batch_mode if self.dbapi and hasattr(self.dbapi, '__version__'): m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?', self.dbapi.__version__) @@ -540,7 +576,8 @@ class PGDialect_psycopg2(PGDialect): # http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9 self.supports_sane_multi_rowcount = \ self.psycopg2_version >= \ - self.FEATURE_VERSION_MAP['sane_multi_rowcount'] + self.FEATURE_VERSION_MAP['sane_multi_rowcount'] and \ + not self.psycopg2_batch_mode @classmethod def dbapi(cls): @@ -638,6 +675,13 @@ class PGDialect_psycopg2(PGDialect): else: return None + def do_executemany(self, cursor, statement, parameters, context=None): + if self.psycopg2_batch_mode: + extras = self._psycopg2_extras() + extras.execute_batch(cursor, statement, parameters) + else: + cursor.executemany(statement, parameters) + @util.memoized_instancemethod def _hstore_oids(self, conn): if self.psycopg2_version >= self.FEATURE_VERSION_MAP['hstore_adapter']: diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index e985718c7..29aa62e3f 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -9,7 +9,7 @@ import datetime from sqlalchemy import ( Table, Column, select, MetaData, text, Integer, String, Sequence, Numeric, DateTime, BigInteger, func, extract, SmallInteger, TypeDecorator, literal, - cast) + cast, bindparam) from sqlalchemy import exc, schema from sqlalchemy.dialects.postgresql import base as postgresql import logging @@ -84,6 +84,87 @@ class DialectTest(fixtures.TestBase): eq_(e.dialect.use_native_unicode, True) +class BatchInsertsTest(fixtures.TablesTest): + __only_on__ = 'postgresql+psycopg2' + __backend__ = True + + run_create_tables = "each" + + @classmethod + def define_tables(cls, metadata): + Table( + 'data', metadata, + Column('id', Integer, primary_key=True), + Column('x', String), + Column('y', String), + Column('z', Integer, server_default="5") + ) + + def setup(self): + super(BatchInsertsTest, self).setup() + self.engine = engines.testing_engine(options={"use_batch_mode": True}) + + def teardown(self): + self.engine.dispose() + super(BatchInsertsTest, self).teardown() + + def test_insert(self): + with self.engine.connect() as conn: + conn.execute( + self.tables.data.insert(), + [ + {"x": "x1", "y": "y1"}, + {"x": "x2", "y": "y2"}, + {"x": "x3", "y": "y3"} + ] + ) + + eq_( + conn.execute(select([self.tables.data])).fetchall(), + [ + (1, "x1", "y1", 5), + (2, "x2", "y2", 5), + (3, "x3", "y3", 5) + ] + ) + + def test_not_sane_rowcount(self): + self.engine.connect().close() + assert not self.engine.dialect.supports_sane_multi_rowcount + + def test_update(self): + with self.engine.connect() as conn: + conn.execute( + self.tables.data.insert(), + [ + {"x": "x1", "y": "y1"}, + {"x": "x2", "y": "y2"}, + {"x": "x3", "y": "y3"} + ] + ) + + conn.execute( + self.tables.data.update(). + where(self.tables.data.c.x == bindparam('xval')). + values(y=bindparam('yval')), + [ + {"xval": "x1", "yval": "y5"}, + {"xval": "x3", "yval": "y6"} + ] + ) + eq_( + conn.execute( + select([self.tables.data]). + order_by(self.tables.data.c.id)). + fetchall(), + [ + (1, "x1", "y5", 5), + (2, "x2", "y2", 5), + (3, "x3", "y6", 5) + ] + ) + + class MiscBackendTest( fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): |
