diff options
| author | Roman Podoliaka <roman.podoliaka@gmail.com> | 2016-11-04 00:31:05 +0200 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-11-10 12:09:27 -0500 |
| commit | d1e31ab1582e2d9275c70a89b72efc2a8651df3f (patch) | |
| tree | 36518daa1d1a2468ba2d68e903b233978afbcab2 /lib/sqlalchemy/testing | |
| parent | 6a688b736429e27a892bc02111414491fe4103b0 (diff) | |
| download | sqlalchemy-d1e31ab1582e2d9275c70a89b72efc2a8651df3f.tar.gz | |
Add support for server side cursors to mysqldb and pymysql
This allows to skip buffering of the results on the client side, e.g.
the following snippet:
table = sa.Table(
'testtbl', sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('a', sa.Integer),
sa.Column('b', sa.String(512))
)
table.create(eng, checkfirst=True)
with eng.connect() as conn:
result = conn.execute(table.select().limit(1)).fetchone()
if result is None:
for _ in range(1000):
conn.execute(
table.insert(),
[{'a': random.randint(1, 100000),
'b': ''.join(random.choice(string.ascii_letters) for _ in range(100))}
for _ in range(1000)]
)
with eng.connect() as conn:
for row in conn.execution_options(stream_results=True).execute(table.select()):
pass
now uses ~23 MB of memory instead of ~327 MB on CPython 3.5.2 and
PyMySQL 0.7.9.
psycopg2 implementation and execution options (stream_results,
server_side_cursors) are reused.
Change-Id: I4dc23ce3094f027bdff51b896b050361991c62e2
Diffstat (limited to 'lib/sqlalchemy/testing')
| -rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 8 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_results.py | 149 |
2 files changed, 156 insertions, 1 deletions
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index af148a3b9..b001aaf75 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -288,6 +288,14 @@ class SuiteRequirements(Requirements): return exclusions.closed() @property + def server_side_cursors(self): + """Target dialect must support server side cursors.""" + + return exclusions.only_if([ + lambda config: config.db.dialect.supports_server_side_cursors + ], "no server side cursors support") + + @property def sequences(self): """Target database must support SEQUENCEs.""" diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index f40d9a04c..98ddc7efc 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -3,8 +3,9 @@ from ..config import requirements from .. import exclusions from ..assertions import eq_ from .. import engines +from ... import testing -from sqlalchemy import Integer, String, select, util, sql, DateTime +from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func import datetime from ..schema import Table, Column @@ -218,3 +219,149 @@ class PercentSchemaNamesTest(fixtures.TablesTest): ), [(5, 15), (7, 15), (9, 15), (11, 15)] ) + + +class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults): + + __requires__ = ('server_side_cursors', ) + + __backend__ = True + + def _is_server_side(self, cursor): + if self.engine.url.drivername == 'postgresql': + return cursor.name + elif self.engine.url.drivername == 'mysql': + sscursor = __import__('MySQLdb.cursors').cursors.SSCursor + return isinstance(cursor, sscursor) + elif self.engine.url.drivername == 'mysql+pymysql': + sscursor = __import__('pymysql.cursors').cursors.SSCursor + return isinstance(cursor, sscursor) + else: + return False + + def _fixture(self, server_side_cursors): + self.engine = engines.testing_engine( + options={'server_side_cursors': server_side_cursors} + ) + return self.engine + + def tearDown(self): + engines.testing_reaper.close_all() + self.engine.dispose() + + def test_global_string(self): + engine = self._fixture(True) + result = engine.execute('select 1') + assert self._is_server_side(result.cursor) + + def test_global_text(self): + engine = self._fixture(True) + result = engine.execute(text('select 1')) + assert self._is_server_side(result.cursor) + + def test_global_expr(self): + engine = self._fixture(True) + result = engine.execute(select([1])) + assert self._is_server_side(result.cursor) + + def test_global_off_explicit(self): + engine = self._fixture(False) + result = engine.execute(text('select 1')) + + # It should be off globally ... + + assert not self._is_server_side(result.cursor) + + def test_stmt_option(self): + engine = self._fixture(False) + + s = select([1]).execution_options(stream_results=True) + result = engine.execute(s) + + # ... but enabled for this one. + + assert self._is_server_side(result.cursor) + + def test_conn_option(self): + engine = self._fixture(False) + + # and this one + result = \ + engine.connect().execution_options(stream_results=True).\ + execute('select 1' + ) + assert self._is_server_side(result.cursor) + + def test_stmt_enabled_conn_option_disabled(self): + engine = self._fixture(False) + + s = select([1]).execution_options(stream_results=True) + + # not this one + result = \ + engine.connect().execution_options(stream_results=False).\ + execute(s) + assert not self._is_server_side(result.cursor) + + def test_stmt_option_disabled(self): + engine = self._fixture(True) + s = select([1]).execution_options(stream_results=False) + result = engine.execute(s) + assert not self._is_server_side(result.cursor) + + def test_aliases_and_ss(self): + engine = self._fixture(False) + s1 = select([1]).execution_options(stream_results=True).alias() + result = engine.execute(s1) + assert self._is_server_side(result.cursor) + + # s1's options shouldn't affect s2 when s2 is used as a + # from_obj. + s2 = select([1], from_obj=s1) + result = engine.execute(s2) + assert not self._is_server_side(result.cursor) + + def test_for_update_expr(self): + engine = self._fixture(True) + s1 = select([1], for_update=True) + result = engine.execute(s1) + assert self._is_server_side(result.cursor) + + def test_for_update_string(self): + engine = self._fixture(True) + result = engine.execute('SELECT 1 FOR UPDATE') + assert self._is_server_side(result.cursor) + + def test_text_no_ss(self): + engine = self._fixture(False) + s = text('select 42') + result = engine.execute(s) + assert not self._is_server_side(result.cursor) + + def test_text_ss_option(self): + engine = self._fixture(False) + s = text('select 42').execution_options(stream_results=True) + result = engine.execute(s) + assert self._is_server_side(result.cursor) + + @testing.provide_metadata + def test_roundtrip(self): + md = self.metadata + + engine = self._fixture(True) + test_table = Table('test_table', md, + Column('id', Integer, primary_key=True), + Column('data', String(50))) + test_table.create(checkfirst=True) + test_table.insert().execute(data='data1') + test_table.insert().execute(data='data2') + eq_(test_table.select().execute().fetchall(), [(1, 'data1' + ), (2, 'data2')]) + test_table.update().where( + test_table.c.id == 2).values( + data=test_table.c.data + + ' updated').execute() + eq_(test_table.select().execute().fetchall(), + [(1, 'data1'), (2, 'data2 updated')]) + test_table.delete().execute() + eq_(select([func.count('*')]).select_from(test_table).scalar(), 0) |
