summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2017-10-10 13:33:59 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2017-10-10 13:43:35 -0400
commit4fbeec9d61b6bcbb40ef931b2aa2c82bd6bc8379 (patch)
tree3339c443406557f1f71d920258565ab0566f7c59
parent2b2cdee7994d4af8dbd3dab28a5588c02e974fc8 (diff)
downloadsqlalchemy-4fbeec9d61b6bcbb40ef931b2aa2c82bd6bc8379.tar.gz
Add fast execution helper support.
Added a new flag ``use_batch_mode`` to the psycopg2 dialect. This flag enables the use of psycopg2's ``psycopg2.extras.execute_batch`` extension when the :class:`.Engine` calls upon ``cursor.executemany()``. This extension provides a critical performance increase by over an order of magnitude when running INSERT statements in batch. The flag is False by default as it is considered to be experimental for now. Change-Id: Ib88d28bc792958d47109f644ff1d08c897db4ff7 Fixes: #4109
-rw-r--r--doc/build/changelog/migration_12.rst27
-rw-r--r--doc/build/faq/performance.rst6
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py46
-rw-r--r--test/dialect/postgresql/test_dialect.py83
4 files changed, 160 insertions, 2 deletions
diff --git a/doc/build/changelog/migration_12.rst b/doc/build/changelog/migration_12.rst
index 4ded828c4..598435ae8 100644
--- a/doc/build/changelog/migration_12.rst
+++ b/doc/build/changelog/migration_12.rst
@@ -1351,6 +1351,33 @@ is already applied.
Dialect Improvements and Changes - PostgreSQL
=============================================
+.. _change_4109:
+
+Support for Batch Mode / Fast Execution Helpers
+------------------------------------------------
+
+The psycopg2 ``cursor.executemany()`` method has been identified as performing
+poorly, particularly with INSERT statements. To alleviate this, psycopg2
+has added `Fast Execution Helpers <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_
+which rework statements such as INSERT statements into fewer SQL calls with
+multiple VALUES clauses. SQLAlchemy 1.2 now includes support for these
+helpers to be used transparently whenever the :class:`.Engine` makes use
+of ``cursor.executemany()`` to invoke a statement against multiple parameter
+sets. The feature is off by default and can be enabled using the
+``use_batch_mode`` argument on :func:`.create_engine`::
+
+ engine = create_engine(
+ "postgresql+psycopg2://scott:tiger@host/dbname",
+ use_batch_mode=True)
+
+The feature is considered to be experimental for the moment but may become
+on by default in a future release.
+
+.. seealso::
+
+ :ref:`psycopg2_batch_mode`
+
+
.. _change_3959:
Support for fields specification in INTERVAL, including full reflection
diff --git a/doc/build/faq/performance.rst b/doc/build/faq/performance.rst
index d6ee557ea..fa298bdad 100644
--- a/doc/build/faq/performance.rst
+++ b/doc/build/faq/performance.rst
@@ -298,6 +298,12 @@ SQL generation and execution system that the ORM builds on top of
is part of the :doc:`Core <core/tutorial>`. Using this system directly, we can produce an INSERT that
is competitive with using the raw database API directly.
+.. note::
+
+ When using the psycopg2 dialect, consider making use of the
+ :ref:`batch execution helpers <psycopg2_batch_mode>` feature of psycopg2,
+ now supported directly by the SQLAlchemy psycopg2 dialect.
+
Alternatively, the SQLAlchemy ORM offers the :ref:`bulk_operations`
suite of methods, which provide hooks into subsections of the unit of
work process in order to emit Core-level INSERT and UPDATE constructs with
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index 3e2968d91..8ac39c201 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -53,6 +53,15 @@ psycopg2-specific keyword arguments which are accepted by
:ref:`psycopg2_unicode`
+* ``use_batch_mode``: This flag allows ``psycopg2.extras.execute_batch``
+ for ``cursor.executemany()`` calls performed by the :class:`.Engine`.
+ It is currently experimental but
+ may well become True by default as it is critical for executemany performance.
+
+ .. seealso::
+
+ :ref:`psycopg2_batch_mode`
+
Unix Domain Connections
------------------------
@@ -101,6 +110,31 @@ The following DBAPI-specific options are respected when used with
.. versionadded:: 1.0.6
+.. _psycopg2_batch_mode:
+
+Psycopg2 Batch Mode (Fast Execution)
+------------------------------------
+
+Modern versions of psycopg2 include a feature known as
+`Fast Execution Helpers <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_,
+which have been shown in benchmarking to improve psycopg2's executemany()
+performance with INSERTS by multiple orders of magnitude. SQLAlchemy
+allows this extension to be used for all ``executemany()`` style calls
+invoked by an :class:`.Engine` when used with multiple parameter sets,
+by adding the ``use_batch_mode`` flag to :func:`.create_engine`::
+
+ engine = create_engine(
+ "postgresql+psycopg2://scott:tiger@host/dbname",
+ use_batch_mode=True)
+
+Batch mode is considered to be **experimental** at this time, however may
+be enabled by default in a future release.
+
+
+.. versionadded:: 1.2.0
+
+
+
.. _psycopg2_unicode:
Unicode with Psycopg2
@@ -510,6 +544,7 @@ class PGDialect_psycopg2(PGDialect):
def __init__(self, server_side_cursors=False, use_native_unicode=True,
client_encoding=None,
use_native_hstore=True, use_native_uuid=True,
+ use_batch_mode=False,
**kwargs):
PGDialect.__init__(self, **kwargs)
self.server_side_cursors = server_side_cursors
@@ -518,6 +553,7 @@ class PGDialect_psycopg2(PGDialect):
self.use_native_uuid = use_native_uuid
self.supports_unicode_binds = use_native_unicode
self.client_encoding = client_encoding
+ self.psycopg2_batch_mode = use_batch_mode
if self.dbapi and hasattr(self.dbapi, '__version__'):
m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?',
self.dbapi.__version__)
@@ -540,7 +576,8 @@ class PGDialect_psycopg2(PGDialect):
# http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9
self.supports_sane_multi_rowcount = \
self.psycopg2_version >= \
- self.FEATURE_VERSION_MAP['sane_multi_rowcount']
+ self.FEATURE_VERSION_MAP['sane_multi_rowcount'] and \
+ not self.psycopg2_batch_mode
@classmethod
def dbapi(cls):
@@ -638,6 +675,13 @@ class PGDialect_psycopg2(PGDialect):
else:
return None
+ def do_executemany(self, cursor, statement, parameters, context=None):
+ if self.psycopg2_batch_mode:
+ extras = self._psycopg2_extras()
+ extras.execute_batch(cursor, statement, parameters)
+ else:
+ cursor.executemany(statement, parameters)
+
@util.memoized_instancemethod
def _hstore_oids(self, conn):
if self.psycopg2_version >= self.FEATURE_VERSION_MAP['hstore_adapter']:
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index e985718c7..29aa62e3f 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -9,7 +9,7 @@ import datetime
from sqlalchemy import (
Table, Column, select, MetaData, text, Integer, String, Sequence, Numeric,
DateTime, BigInteger, func, extract, SmallInteger, TypeDecorator, literal,
- cast)
+ cast, bindparam)
from sqlalchemy import exc, schema
from sqlalchemy.dialects.postgresql import base as postgresql
import logging
@@ -84,6 +84,87 @@ class DialectTest(fixtures.TestBase):
eq_(e.dialect.use_native_unicode, True)
+class BatchInsertsTest(fixtures.TablesTest):
+ __only_on__ = 'postgresql+psycopg2'
+ __backend__ = True
+
+ run_create_tables = "each"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'data', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x', String),
+ Column('y', String),
+ Column('z', Integer, server_default="5")
+ )
+
+ def setup(self):
+ super(BatchInsertsTest, self).setup()
+ self.engine = engines.testing_engine(options={"use_batch_mode": True})
+
+ def teardown(self):
+ self.engine.dispose()
+ super(BatchInsertsTest, self).teardown()
+
+ def test_insert(self):
+ with self.engine.connect() as conn:
+ conn.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"}
+ ]
+ )
+
+ eq_(
+ conn.execute(select([self.tables.data])).fetchall(),
+ [
+ (1, "x1", "y1", 5),
+ (2, "x2", "y2", 5),
+ (3, "x3", "y3", 5)
+ ]
+ )
+
+ def test_not_sane_rowcount(self):
+ self.engine.connect().close()
+ assert not self.engine.dialect.supports_sane_multi_rowcount
+
+ def test_update(self):
+ with self.engine.connect() as conn:
+ conn.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"}
+ ]
+ )
+
+ conn.execute(
+ self.tables.data.update().
+ where(self.tables.data.c.x == bindparam('xval')).
+ values(y=bindparam('yval')),
+ [
+ {"xval": "x1", "yval": "y5"},
+ {"xval": "x3", "yval": "y6"}
+ ]
+ )
+ eq_(
+ conn.execute(
+ select([self.tables.data]).
+ order_by(self.tables.data.c.id)).
+ fetchall(),
+ [
+ (1, "x1", "y5", 5),
+ (2, "x2", "y2", 5),
+ (3, "x3", "y6", 5)
+ ]
+ )
+
+
class MiscBackendTest(
fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):