diff options
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/dialects/mysql/base.py | 22 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/mysql/mysqldb.py | 18 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/mysql/pymysql.py | 15 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 48 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 46 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/query.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 8 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_results.py | 149 |
9 files changed, 269 insertions, 43 deletions
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index e7e533890..449fffaba 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -177,6 +177,22 @@ multi-column key for some storage engines:: Column('id', Integer, primary_key=True) ) +.. _mysql_ss_cursors: + +Server Side Cursors +------------------- + +Server-side cursor support is available for the MySQLdb and PyMySQL dialects. +From a MySQL point of view this means that the ``MySQLdb.cursors.SSCursor`` or +``pymysql.cursors.SSCursor`` class is used when building up the cursor which +will receive results. The most typical way of invoking this feature is via the +:paramref:`.Connection.execution_options.stream_results` connection execution +option. Server side cursors can also be enabled for all SELECT statements +unconditionally by passing ``server_side_cursors=True`` to +:func:`.create_engine`. + +.. versionadded:: 1.1.4 - added server-side cursor support. + .. _mysql_unicode: Unicode @@ -743,6 +759,12 @@ class MySQLExecutionContext(default.DefaultExecutionContext): def should_autocommit_text(self, statement): return AUTOCOMMIT_RE.match(statement) + def create_server_side_cursor(self): + if self.dialect.supports_server_side_cursors: + return self._dbapi_connection.cursor(self.dialect._sscursor) + else: + raise NotImplementedError() + class MySQLCompiler(compiler.SQLCompiler): diff --git a/lib/sqlalchemy/dialects/mysql/mysqldb.py b/lib/sqlalchemy/dialects/mysql/mysqldb.py index aa8377b27..568c05f62 100644 --- a/lib/sqlalchemy/dialects/mysql/mysqldb.py +++ b/lib/sqlalchemy/dialects/mysql/mysqldb.py @@ -38,6 +38,11 @@ using a URL like the following:: mysql+mysqldb://root@/<dbname>?unix_socket=/cloudsql/<projectid>:<instancename> +Server Side Cursors +------------------- + +The mysqldb dialect supports server-side cursors. See :ref:`mysql_ss_cursors`. + """ from .base import (MySQLDialect, MySQLExecutionContext, @@ -87,6 +92,19 @@ class MySQLDialect_mysqldb(MySQLDialect): statement_compiler = MySQLCompiler_mysqldb preparer = MySQLIdentifierPreparer_mysqldb + def __init__(self, server_side_cursors=False, **kwargs): + super(MySQLDialect_mysqldb, self).__init__(**kwargs) + self.server_side_cursors = server_side_cursors + + @util.langhelpers.memoized_property + def supports_server_side_cursors(self): + try: + cursors = __import__('MySQLdb.cursors').cursors + self._sscursor = cursors.SSCursor + return True + except (ImportError, AttributeError): + return False + @classmethod def dbapi(cls): return __import__('MySQLdb') diff --git a/lib/sqlalchemy/dialects/mysql/pymysql.py b/lib/sqlalchemy/dialects/mysql/pymysql.py index 3c493fbfc..e29c17d8b 100644 --- a/lib/sqlalchemy/dialects/mysql/pymysql.py +++ b/lib/sqlalchemy/dialects/mysql/pymysql.py @@ -30,7 +30,7 @@ to the pymysql driver as well. """ from .mysqldb import MySQLDialect_mysqldb -from ...util import py3k +from ...util import langhelpers, py3k class MySQLDialect_pymysql(MySQLDialect_mysqldb): @@ -44,6 +44,19 @@ class MySQLDialect_pymysql(MySQLDialect_mysqldb): supports_unicode_statements = True supports_unicode_binds = True + def __init__(self, server_side_cursors=False, **kwargs): + super(MySQLDialect_pymysql, self).__init__(**kwargs) + self.server_side_cursors = server_side_cursors + + @langhelpers.memoized_property + def supports_server_side_cursors(self): + try: + cursors = __import__('pymysql.cursors').cursors + self._sscursor = cursors.SSCursor + return True + except (ImportError, AttributeError): + return False + @classmethod def dbapi(cls): return __import__('pymysql') diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 8488da816..27a1ec099 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -28,7 +28,8 @@ psycopg2-specific keyword arguments which are accepted by :class:`~sqlalchemy.engine.ResultProxy` uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows at a time are fetched over the wire to reduce conversational overhead. - Note that the ``stream_results=True`` execution option is a more targeted + Note that the :paramref:`.Connection.execution_options.stream_results` + execution option is a more targeted way of enabling this mode on a per-execution basis. * ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode per connection. True by default. @@ -422,53 +423,24 @@ class _PGUUID(UUID): return value return process -# When we're handed literal SQL, ensure it's a SELECT query. Since -# 8.3, combining cursors and "FOR UPDATE" has been fine. -SERVER_SIDE_CURSOR_RE = re.compile( - r'\s*SELECT', - re.I | re.UNICODE) _server_side_id = util.counter() class PGExecutionContext_psycopg2(PGExecutionContext): - def create_cursor(self): - # TODO: coverage for server side cursors + select.for_update() - - if self.dialect.server_side_cursors: - is_server_side = \ - self.execution_options.get('stream_results', True) and ( - (self.compiled and isinstance(self.compiled.statement, - expression.Selectable) - or - ( - (not self.compiled or - isinstance(self.compiled.statement, - expression.TextClause)) - and self.statement and SERVER_SIDE_CURSOR_RE.match( - self.statement)) - ) - ) - else: - is_server_side = \ - self.execution_options.get('stream_results', False) - - self.__is_server_side = is_server_side - if is_server_side: - # use server-side cursors: - # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html - ident = "c_%s_%s" % (hex(id(self))[2:], - hex(_server_side_id())[2:]) - return self._dbapi_connection.cursor(ident) - else: - return self._dbapi_connection.cursor() + def create_server_side_cursor(self): + # use server-side cursors: + # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html + ident = "c_%s_%s" % (hex(id(self))[2:], + hex(_server_side_id())[2:]) + return self._dbapi_connection.cursor(ident) def get_result_proxy(self): # TODO: ouch if logger.isEnabledFor(logging.INFO): self._log_notices(self.cursor) - if self.__is_server_side: + if self._is_server_side: return _result.BufferedRowResultProxy(self) else: return _result.ResultProxy(self) @@ -502,6 +474,8 @@ class PGDialect_psycopg2(PGDialect): if util.py2k: supports_unicode_statements = False + supports_server_side_cursors = True + default_paramstyle = 'pyformat' # set to true based on psycopg2 version supports_sane_multi_rowcount = False diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 1d23c66b3..f071abaa1 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -295,7 +295,7 @@ class Connection(Connectable): Indicate to the dialect that results should be "streamed" and not pre-buffered, if possible. This is a limitation of many DBAPIs. The flag is currently understood only by the - psycopg2 dialect. + psycopg2, mysqldb and pymysql dialects. :param schema_translate_map: Available on: Connection, Engine. A dictionary mapping schema names to schema names, that will be diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3ee240383..719178f7e 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -27,6 +27,11 @@ AUTOCOMMIT_REGEXP = re.compile( r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)', re.I | re.UNICODE) +# When we're handed literal SQL, ensure it's a SELECT query +SERVER_SIDE_CURSOR_RE = re.compile( + r'\s*SELECT', + re.I | re.UNICODE) + class DefaultDialect(interfaces.Dialect): """Default implementation of Dialect""" @@ -108,6 +113,8 @@ class DefaultDialect(interfaces.Dialect): supports_empty_insert = True supports_multivalues_insert = False + supports_server_side_cursors = False + server_version_info = None construct_arguments = None @@ -780,8 +787,40 @@ class DefaultExecutionContext(interfaces.ExecutionContext): def should_autocommit_text(self, statement): return AUTOCOMMIT_REGEXP.match(statement) + def _use_server_side_cursor(self): + if not self.dialect.supports_server_side_cursors: + return False + + if self.dialect.server_side_cursors: + use_server_side = \ + self.execution_options.get('stream_results', True) and ( + (self.compiled and isinstance(self.compiled.statement, + expression.Selectable) + or + ( + (not self.compiled or + isinstance(self.compiled.statement, + expression.TextClause)) + and self.statement and SERVER_SIDE_CURSOR_RE.match( + self.statement)) + ) + ) + else: + use_server_side = \ + self.execution_options.get('stream_results', False) + + return use_server_side + def create_cursor(self): - return self._dbapi_connection.cursor() + if self._use_server_side_cursor(): + self._is_server_side = True + return self.create_server_side_cursor() + else: + self._is_server_side = False + return self._dbapi_connection.cursor() + + def create_server_side_cursor(self): + raise NotImplementedError() def pre_exec(self): pass @@ -831,7 +870,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext): pass def get_result_proxy(self): - return result.ResultProxy(self) + if self._is_server_side: + return result.BufferedRowResultProxy(self) + else: + return result.ResultProxy(self) @property def rowcount(self): diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 23d33b0d1..139b61afb 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -751,7 +751,9 @@ class Query(object): :meth:`~sqlalchemy.orm.query.Query.yield_per` will set the ``stream_results`` execution option to True, currently this is only understood by - :mod:`~sqlalchemy.dialects.postgresql.psycopg2` dialect + :mod:`~sqlalchemy.dialects.postgresql.psycopg2`, + :mod:`~sqlalchemy.dialects.mysql.mysqldb` and + :mod:`~sqlalchemy.dialects.mysql.pymysql` dialects which will stream results using server side cursors instead of pre-buffer all rows for this query. Other DBAPIs **pre-buffer all rows** before making them diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index af148a3b9..b001aaf75 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -288,6 +288,14 @@ class SuiteRequirements(Requirements): return exclusions.closed() @property + def server_side_cursors(self): + """Target dialect must support server side cursors.""" + + return exclusions.only_if([ + lambda config: config.db.dialect.supports_server_side_cursors + ], "no server side cursors support") + + @property def sequences(self): """Target database must support SEQUENCEs.""" diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index f40d9a04c..98ddc7efc 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -3,8 +3,9 @@ from ..config import requirements from .. import exclusions from ..assertions import eq_ from .. import engines +from ... import testing -from sqlalchemy import Integer, String, select, util, sql, DateTime +from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func import datetime from ..schema import Table, Column @@ -218,3 +219,149 @@ class PercentSchemaNamesTest(fixtures.TablesTest): ), [(5, 15), (7, 15), (9, 15), (11, 15)] ) + + +class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults): + + __requires__ = ('server_side_cursors', ) + + __backend__ = True + + def _is_server_side(self, cursor): + if self.engine.url.drivername == 'postgresql': + return cursor.name + elif self.engine.url.drivername == 'mysql': + sscursor = __import__('MySQLdb.cursors').cursors.SSCursor + return isinstance(cursor, sscursor) + elif self.engine.url.drivername == 'mysql+pymysql': + sscursor = __import__('pymysql.cursors').cursors.SSCursor + return isinstance(cursor, sscursor) + else: + return False + + def _fixture(self, server_side_cursors): + self.engine = engines.testing_engine( + options={'server_side_cursors': server_side_cursors} + ) + return self.engine + + def tearDown(self): + engines.testing_reaper.close_all() + self.engine.dispose() + + def test_global_string(self): + engine = self._fixture(True) + result = engine.execute('select 1') + assert self._is_server_side(result.cursor) + + def test_global_text(self): + engine = self._fixture(True) + result = engine.execute(text('select 1')) + assert self._is_server_side(result.cursor) + + def test_global_expr(self): + engine = self._fixture(True) + result = engine.execute(select([1])) + assert self._is_server_side(result.cursor) + + def test_global_off_explicit(self): + engine = self._fixture(False) + result = engine.execute(text('select 1')) + + # It should be off globally ... + + assert not self._is_server_side(result.cursor) + + def test_stmt_option(self): + engine = self._fixture(False) + + s = select([1]).execution_options(stream_results=True) + result = engine.execute(s) + + # ... but enabled for this one. + + assert self._is_server_side(result.cursor) + + def test_conn_option(self): + engine = self._fixture(False) + + # and this one + result = \ + engine.connect().execution_options(stream_results=True).\ + execute('select 1' + ) + assert self._is_server_side(result.cursor) + + def test_stmt_enabled_conn_option_disabled(self): + engine = self._fixture(False) + + s = select([1]).execution_options(stream_results=True) + + # not this one + result = \ + engine.connect().execution_options(stream_results=False).\ + execute(s) + assert not self._is_server_side(result.cursor) + + def test_stmt_option_disabled(self): + engine = self._fixture(True) + s = select([1]).execution_options(stream_results=False) + result = engine.execute(s) + assert not self._is_server_side(result.cursor) + + def test_aliases_and_ss(self): + engine = self._fixture(False) + s1 = select([1]).execution_options(stream_results=True).alias() + result = engine.execute(s1) + assert self._is_server_side(result.cursor) + + # s1's options shouldn't affect s2 when s2 is used as a + # from_obj. + s2 = select([1], from_obj=s1) + result = engine.execute(s2) + assert not self._is_server_side(result.cursor) + + def test_for_update_expr(self): + engine = self._fixture(True) + s1 = select([1], for_update=True) + result = engine.execute(s1) + assert self._is_server_side(result.cursor) + + def test_for_update_string(self): + engine = self._fixture(True) + result = engine.execute('SELECT 1 FOR UPDATE') + assert self._is_server_side(result.cursor) + + def test_text_no_ss(self): + engine = self._fixture(False) + s = text('select 42') + result = engine.execute(s) + assert not self._is_server_side(result.cursor) + + def test_text_ss_option(self): + engine = self._fixture(False) + s = text('select 42').execution_options(stream_results=True) + result = engine.execute(s) + assert self._is_server_side(result.cursor) + + @testing.provide_metadata + def test_roundtrip(self): + md = self.metadata + + engine = self._fixture(True) + test_table = Table('test_table', md, + Column('id', Integer, primary_key=True), + Column('data', String(50))) + test_table.create(checkfirst=True) + test_table.insert().execute(data='data1') + test_table.insert().execute(data='data2') + eq_(test_table.select().execute().fetchall(), [(1, 'data1' + ), (2, 'data2')]) + test_table.update().where( + test_table.c.id == 2).values( + data=test_table.c.data + + ' updated').execute() + eq_(test_table.select().execute().fetchall(), + [(1, 'data1'), (2, 'data2 updated')]) + test_table.delete().execute() + eq_(select([func.count('*')]).select_from(test_table).scalar(), 0) |
