summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing
diff options
context:
space:
mode:
authorRoman Podoliaka <roman.podoliaka@gmail.com>2016-11-04 00:31:05 +0200
committerMike Bayer <mike_mp@zzzcomputing.com>2016-11-10 12:09:27 -0500
commitd1e31ab1582e2d9275c70a89b72efc2a8651df3f (patch)
tree36518daa1d1a2468ba2d68e903b233978afbcab2 /lib/sqlalchemy/testing
parent6a688b736429e27a892bc02111414491fe4103b0 (diff)
downloadsqlalchemy-d1e31ab1582e2d9275c70a89b72efc2a8651df3f.tar.gz
Add support for server side cursors to mysqldb and pymysql
This allows to skip buffering of the results on the client side, e.g. the following snippet: table = sa.Table( 'testtbl', sa.MetaData(), sa.Column('id', sa.Integer, primary_key=True), sa.Column('a', sa.Integer), sa.Column('b', sa.String(512)) ) table.create(eng, checkfirst=True) with eng.connect() as conn: result = conn.execute(table.select().limit(1)).fetchone() if result is None: for _ in range(1000): conn.execute( table.insert(), [{'a': random.randint(1, 100000), 'b': ''.join(random.choice(string.ascii_letters) for _ in range(100))} for _ in range(1000)] ) with eng.connect() as conn: for row in conn.execution_options(stream_results=True).execute(table.select()): pass now uses ~23 MB of memory instead of ~327 MB on CPython 3.5.2 and PyMySQL 0.7.9. psycopg2 implementation and execution options (stream_results, server_side_cursors) are reused. Change-Id: I4dc23ce3094f027bdff51b896b050361991c62e2
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r--lib/sqlalchemy/testing/requirements.py8
-rw-r--r--lib/sqlalchemy/testing/suite/test_results.py149
2 files changed, 156 insertions, 1 deletions
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index af148a3b9..b001aaf75 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -288,6 +288,14 @@ class SuiteRequirements(Requirements):
return exclusions.closed()
@property
+ def server_side_cursors(self):
+ """Target dialect must support server side cursors."""
+
+ return exclusions.only_if([
+ lambda config: config.db.dialect.supports_server_side_cursors
+ ], "no server side cursors support")
+
+ @property
def sequences(self):
"""Target database must support SEQUENCEs."""
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py
index f40d9a04c..98ddc7efc 100644
--- a/lib/sqlalchemy/testing/suite/test_results.py
+++ b/lib/sqlalchemy/testing/suite/test_results.py
@@ -3,8 +3,9 @@ from ..config import requirements
from .. import exclusions
from ..assertions import eq_
from .. import engines
+from ... import testing
-from sqlalchemy import Integer, String, select, util, sql, DateTime
+from sqlalchemy import Integer, String, select, util, sql, DateTime, text, func
import datetime
from ..schema import Table, Column
@@ -218,3 +219,149 @@ class PercentSchemaNamesTest(fixtures.TablesTest):
),
[(5, 15), (7, 15), (9, 15), (11, 15)]
)
+
+
+class ServerSideCursorsTest(fixtures.TestBase, testing.AssertsExecutionResults):
+
+ __requires__ = ('server_side_cursors', )
+
+ __backend__ = True
+
+ def _is_server_side(self, cursor):
+ if self.engine.url.drivername == 'postgresql':
+ return cursor.name
+ elif self.engine.url.drivername == 'mysql':
+ sscursor = __import__('MySQLdb.cursors').cursors.SSCursor
+ return isinstance(cursor, sscursor)
+ elif self.engine.url.drivername == 'mysql+pymysql':
+ sscursor = __import__('pymysql.cursors').cursors.SSCursor
+ return isinstance(cursor, sscursor)
+ else:
+ return False
+
+ def _fixture(self, server_side_cursors):
+ self.engine = engines.testing_engine(
+ options={'server_side_cursors': server_side_cursors}
+ )
+ return self.engine
+
+ def tearDown(self):
+ engines.testing_reaper.close_all()
+ self.engine.dispose()
+
+ def test_global_string(self):
+ engine = self._fixture(True)
+ result = engine.execute('select 1')
+ assert self._is_server_side(result.cursor)
+
+ def test_global_text(self):
+ engine = self._fixture(True)
+ result = engine.execute(text('select 1'))
+ assert self._is_server_side(result.cursor)
+
+ def test_global_expr(self):
+ engine = self._fixture(True)
+ result = engine.execute(select([1]))
+ assert self._is_server_side(result.cursor)
+
+ def test_global_off_explicit(self):
+ engine = self._fixture(False)
+ result = engine.execute(text('select 1'))
+
+ # It should be off globally ...
+
+ assert not self._is_server_side(result.cursor)
+
+ def test_stmt_option(self):
+ engine = self._fixture(False)
+
+ s = select([1]).execution_options(stream_results=True)
+ result = engine.execute(s)
+
+ # ... but enabled for this one.
+
+ assert self._is_server_side(result.cursor)
+
+ def test_conn_option(self):
+ engine = self._fixture(False)
+
+ # and this one
+ result = \
+ engine.connect().execution_options(stream_results=True).\
+ execute('select 1'
+ )
+ assert self._is_server_side(result.cursor)
+
+ def test_stmt_enabled_conn_option_disabled(self):
+ engine = self._fixture(False)
+
+ s = select([1]).execution_options(stream_results=True)
+
+ # not this one
+ result = \
+ engine.connect().execution_options(stream_results=False).\
+ execute(s)
+ assert not self._is_server_side(result.cursor)
+
+ def test_stmt_option_disabled(self):
+ engine = self._fixture(True)
+ s = select([1]).execution_options(stream_results=False)
+ result = engine.execute(s)
+ assert not self._is_server_side(result.cursor)
+
+ def test_aliases_and_ss(self):
+ engine = self._fixture(False)
+ s1 = select([1]).execution_options(stream_results=True).alias()
+ result = engine.execute(s1)
+ assert self._is_server_side(result.cursor)
+
+ # s1's options shouldn't affect s2 when s2 is used as a
+ # from_obj.
+ s2 = select([1], from_obj=s1)
+ result = engine.execute(s2)
+ assert not self._is_server_side(result.cursor)
+
+ def test_for_update_expr(self):
+ engine = self._fixture(True)
+ s1 = select([1], for_update=True)
+ result = engine.execute(s1)
+ assert self._is_server_side(result.cursor)
+
+ def test_for_update_string(self):
+ engine = self._fixture(True)
+ result = engine.execute('SELECT 1 FOR UPDATE')
+ assert self._is_server_side(result.cursor)
+
+ def test_text_no_ss(self):
+ engine = self._fixture(False)
+ s = text('select 42')
+ result = engine.execute(s)
+ assert not self._is_server_side(result.cursor)
+
+ def test_text_ss_option(self):
+ engine = self._fixture(False)
+ s = text('select 42').execution_options(stream_results=True)
+ result = engine.execute(s)
+ assert self._is_server_side(result.cursor)
+
+ @testing.provide_metadata
+ def test_roundtrip(self):
+ md = self.metadata
+
+ engine = self._fixture(True)
+ test_table = Table('test_table', md,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)))
+ test_table.create(checkfirst=True)
+ test_table.insert().execute(data='data1')
+ test_table.insert().execute(data='data2')
+ eq_(test_table.select().execute().fetchall(), [(1, 'data1'
+ ), (2, 'data2')])
+ test_table.update().where(
+ test_table.c.id == 2).values(
+ data=test_table.c.data +
+ ' updated').execute()
+ eq_(test_table.select().execute().fetchall(),
+ [(1, 'data1'), (2, 'data2 updated')])
+ test_table.delete().execute()
+ eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)